Browse Source

Merge master branch.

Chuanyi Li 6 years ago
parent
commit
d4797a8fb9
34 changed files with 2857 additions and 46 deletions
  1. 16 7
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java
  2. 2 1
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java
  3. 76 0
      example/pom.xml
  4. 144 0
      example/src/main/java/com/alibaba/otter/canal/example/db/AbstractDbClient.java
  5. 488 0
      example/src/main/java/com/alibaba/otter/canal/example/db/CanalConnectorClient.java
  6. 35 0
      example/src/main/java/com/alibaba/otter/canal/example/db/MysqlLoadLauncher.java
  7. 169 0
      example/src/main/java/com/alibaba/otter/canal/example/db/PropertyPlaceholderConfigurer.java
  8. 44 0
      example/src/main/java/com/alibaba/otter/canal/example/db/ServiceLocator.java
  9. 121 0
      example/src/main/java/com/alibaba/otter/canal/example/db/dialect/AbstractDbDialect.java
  10. 105 0
      example/src/main/java/com/alibaba/otter/canal/example/db/dialect/AbstractSqlTemplate.java
  11. 20 0
      example/src/main/java/com/alibaba/otter/canal/example/db/dialect/DbDialect.java
  12. 40 0
      example/src/main/java/com/alibaba/otter/canal/example/db/dialect/SqlTemplate.java
  13. 93 0
      example/src/main/java/com/alibaba/otter/canal/example/db/dialect/TableType.java
  14. 32 0
      example/src/main/java/com/alibaba/otter/canal/example/db/dialect/mysql/MysqlDialect.java
  15. 84 0
      example/src/main/java/com/alibaba/otter/canal/example/db/dialect/mysql/MysqlSqlTemplate.java
  16. 207 0
      example/src/main/java/com/alibaba/otter/canal/example/db/mysql/AbstractMysqlClient.java
  17. 23 0
      example/src/main/java/com/alibaba/otter/canal/example/db/mysql/MysqlClient.java
  18. 50 0
      example/src/main/java/com/alibaba/otter/canal/example/db/utils/ByteArrayConverter.java
  19. 326 0
      example/src/main/java/com/alibaba/otter/canal/example/db/utils/DdlUtils.java
  20. 140 0
      example/src/main/java/com/alibaba/otter/canal/example/db/utils/SqlTimestampConverter.java
  21. 315 0
      example/src/main/java/com/alibaba/otter/canal/example/db/utils/SqlUtils.java
  22. 53 0
      example/src/main/resources/client-spring.xml
  23. 16 0
      example/src/main/resources/client.properties
  24. 15 4
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java
  25. 38 2
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java
  26. 36 20
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java
  27. 0 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  28. 8 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  29. 19 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DefaultTableMetaTSDBFactory.java
  30. 5 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBBuilder.java
  31. 18 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBFactory.java
  32. 38 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/NoStorageTest.java
  33. 77 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/StorageTest.java
  34. 4 4
      pom.xml

+ 16 - 7
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java

@@ -490,7 +490,6 @@ public final class RowsLogBuffer {
                     // t % 100);
 
                     StringBuilder builder = new StringBuilder();
-                    builder.append(26);
                     appendNumber4(builder, d / 10000);
                     builder.append('-');
                     appendNumber2(builder, (d % 10000) / 100);
@@ -615,7 +614,13 @@ public final class RowsLogBuffer {
                     if (i32 < 0) {
                         builder.append('-');
                     }
-                    appendNumber2(builder, u32 / 10000);
+
+                    int d = u32 / 10000;
+                    if (d > 100) {
+                        builder.append(String.valueOf(d));
+                    } else {
+                        appendNumber2(builder, d);
+                    }
                     builder.append(':');
                     appendNumber2(builder, (u32 % 10000) / 100);
                     builder.append(':');
@@ -724,7 +729,13 @@ public final class RowsLogBuffer {
                     if (ltime < 0) {
                         builder.append('-');
                     }
-                    appendNumber2(builder, (int) ((intpart >> 12) % (1 << 10)));
+
+                    int d = (int) ((intpart >> 12) % (1 << 10));
+                    if (d > 100) {
+                        builder.append(String.valueOf(d));
+                    } else {
+                        appendNumber2(builder, d);
+                    }
                     builder.append(':');
                     appendNumber2(builder, (int) ((intpart >> 6) % (1 << 6)));
                     builder.append(':');
@@ -1134,7 +1145,7 @@ public final class RowsLogBuffer {
                 .append(digits[(d / 100) % 10])
                 .append(digits[(d / 10) % 10])
                 .append(digits[d % 10]);
-        } else if (d >= 100) {
+        } else {
             builder.append('0');
             appendNumber3(builder, d);
         }
@@ -1142,9 +1153,7 @@ public final class RowsLogBuffer {
 
     private void appendNumber3(StringBuilder builder, int d) {
         if (d >= 100) {
-            builder.append(digits[d / 100])
-                .append(digits[(d / 10) % 10])
-                .append(digits[d % 10]);
+            builder.append(digits[d / 100]).append(digits[(d / 10) % 10]).append(digits[d % 10]);
         } else {
             builder.append('0');
             appendNumber2(builder, d);

+ 2 - 1
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -34,6 +34,7 @@ import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
 import com.alibaba.otter.canal.instance.manager.CanalConfigClient;
 import com.alibaba.otter.canal.instance.manager.ManagerCanalInstanceGenerator;
 import com.alibaba.otter.canal.instance.spring.SpringCanalInstanceGenerator;
+import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
 import com.alibaba.otter.canal.server.exception.CanalServerException;
 import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;
@@ -303,7 +304,7 @@ public class CanalController {
                     return instanceGenerator.generate(destination);
                 } else if (config.getMode().isSpring()) {
                     SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
-                    synchronized (this) {
+                    synchronized (CanalEventParser.class) {
                         try {
                             // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
                             System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination);

+ 76 - 0
example/pom.xml

@@ -21,6 +21,82 @@
 			<artifactId>canal.protocol</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>com.alibaba</groupId>
+			<artifactId>druid</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>mysql</groupId>
+			<artifactId>mysql-connector-java</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.ddlutils</groupId>
+			<artifactId>ddlutils</artifactId>
+			<version>1.0</version>
+			<exclusions>
+				<exclusion>
+					<groupId>commons-beanutils</groupId>
+					<artifactId>commons-beanutils-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-lang</groupId>
+					<artifactId>commons-lang</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-dbcp</groupId>
+					<artifactId>commons-dbcp</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-pool</groupId>
+					<artifactId>commons-pool</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-logging</groupId>
+					<artifactId>commons-logging-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>dom4j</groupId>
+					<artifactId>dom4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>stax</groupId>
+					<artifactId>stax-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-collections</groupId>
+					<artifactId>commons-collections</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-digester</groupId>
+					<artifactId>commons-digester</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-betwixt</groupId>
+					<artifactId>commons-betwixt</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-pool2</artifactId>
+			<version>2.5.0</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-beanutils</groupId>
+			<artifactId>commons-beanutils</artifactId>
+			<version>1.8.2</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-lang3</artifactId>
+			<version>3.7</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-collections</groupId>
+			<artifactId>commons-collections</artifactId>
+			<version>3.2</version>
+		</dependency>
+
 		<!-- test dependency -->
 		<dependency>
 			<groupId>junit</groupId>

+ 144 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/AbstractDbClient.java

@@ -0,0 +1,144 @@
+package com.alibaba.otter.canal.example.db;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.Message;
+import org.slf4j.MDC;
+
+import java.util.Date;
+import java.util.List;
+
+public abstract class AbstractDbClient extends CanalConnectorClient {
+
+
+    public abstract void insert(CanalEntry.Header header, List<CanalEntry.Column> afterColumns);
+
+    public abstract void update(CanalEntry.Header header, List<CanalEntry.Column> afterColumns);
+
+    public abstract void delete(CanalEntry.Header header, List<CanalEntry.Column> beforeColumns);
+
+
+    @Override
+    public synchronized void start() {
+        if (running) {
+            return;
+        }
+        super.start();
+    }
+
+    @Override
+    public synchronized void stop() {
+        if (!running) {
+            return;
+        }
+        super.stop();
+        MDC.remove("destination");
+    }
+
+    @Override
+    protected void processMessage(Message message) {
+        long batchId = message.getId();
+        //遍历每条消息
+        for (CanalEntry.Entry entry : message.getEntries()) {
+            session(entry);//no exception
+        }
+        //ack all the time。
+        connector.ack(batchId);
+    }
+
+    private void session(CanalEntry.Entry entry) {
+        CanalEntry.EntryType entryType = entry.getEntryType();
+        int times = 0;
+        boolean success = false;
+        while (!success) {
+            if (times > 0) {
+                /**
+                 * 1:retry,重试,重试默认为3次,由retryTimes参数决定,如果重试次数达到阈值,则跳过,并且记录日志。
+                 * 2:ignore,直接忽略,不重试,记录日志。
+                 */
+                if (exceptionStrategy == ExceptionStrategy.RETRY.code) {
+                    if (times >= retryTimes) {
+                        break;
+                    }
+                } else {
+                    break;
+                }
+            }
+            try {
+                switch (entryType) {
+                    case TRANSACTIONBEGIN:
+                        transactionBegin(entry);
+                        break;
+                    case TRANSACTIONEND:
+                        transactionEnd(entry);
+                        break;
+                    case ROWDATA:
+                        rowData(entry);
+                        break;
+                    default:
+                        break;
+                }
+                success = true;
+            } catch (Exception e) {
+                times++;
+                logger.error("parse event has an error ,times: + " + times + ", data:" + entry.toString(), e);
+            }
+
+        }
+    }
+
+    private void rowData(CanalEntry.Entry entry) throws Exception {
+        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+        CanalEntry.EventType eventType = rowChange.getEventType();
+        CanalEntry.Header header = entry.getHeader();
+        long executeTime = header.getExecuteTime();
+        long delayTime = new Date().getTime() - executeTime;
+        String sql = rowChange.getSql();
+
+        try {
+            if (!isDML(eventType) || rowChange.getIsDdl()) {
+                processDDL(header, eventType, sql);
+                return;
+            }
+            //处理DML数据
+            processDML(header, eventType, rowChange, sql);
+        } catch (Exception e) {
+            logger.error("process event error ,", e);
+            logger.error(rowFormat,
+                    new Object[]{header.getLogfileName(), String.valueOf(header.getLogfileOffset()),
+                            header.getSchemaName(), header.getTableName(), eventType,
+                            String.valueOf(executeTime), String.valueOf(delayTime)});
+            throw e;//重新抛出
+        }
+    }
+
+    /**
+     * 处理 dml 数据
+     *
+     * @param header
+     * @param eventType
+     * @param rowChange
+     * @param sql
+     */
+    protected void processDML(CanalEntry.Header header, CanalEntry.EventType eventType, CanalEntry.RowChange rowChange, String sql) {
+        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+            switch (eventType) {
+                case DELETE:
+                    delete(header, rowData.getBeforeColumnsList());
+                    break;
+                case INSERT:
+                    insert(header, rowData.getAfterColumnsList());
+                    break;
+                case UPDATE:
+                    update(header, rowData.getAfterColumnsList());
+                    break;
+                default:
+                    whenOthers(header, sql);
+            }
+        }
+    }
+
+}
+
+
+
+

+ 488 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/CanalConnectorClient.java

@@ -0,0 +1,488 @@
+package com.alibaba.otter.canal.example.db;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalConnectors;
+import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.Message;
+import org.apache.commons.lang.SystemUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.util.CollectionUtils;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+public abstract class CanalConnectorClient extends AbstractCanalLifeCycle implements InitializingBean {
+
+    protected static final Logger logger = LoggerFactory.getLogger(CanalConnectorClient.class);
+    protected static final String SEP = SystemUtils.LINE_SEPARATOR;
+    protected static String contextFormat;
+    protected static String rowFormat;
+    protected static String transactionFormat;
+    protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+    static {
+        StringBuilder sb = new StringBuilder();
+        sb.append(SEP)
+                .append("-------------Batch-------------")
+                .append(SEP)
+                .append("* Batch Id: [{}] ,count : [{}] , Mem size : [{}] , Time : {}")
+                .append(SEP)
+                .append("* Start : [{}] ")
+                .append(SEP)
+                .append("* End : [{}] ")
+                .append(SEP)
+                .append("-------------------------------")
+                .append(SEP);
+        contextFormat = sb.toString();
+
+        sb = new StringBuilder();
+        sb.append(SEP)
+                .append("+++++++++++++Row+++++++++++++>>>")
+                .append("binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms")
+                .append(SEP);
+        rowFormat = sb.toString();
+
+        sb = new StringBuilder();
+        sb.append(SEP)
+                .append("===========Transaction {} : {}=======>>>")
+                .append("binlog[{}:{}] , executeTime : {} , delay : {}ms")
+                .append(SEP);
+        transactionFormat = sb.toString();
+    }
+
+    private String zkServers;//cluster
+    private String address;//single,ip:port
+    private String destination;
+    private String username;
+    private String password;
+    private int batchSize = 5 * 1024;
+    private String filter = "";//同canal filter,用于过滤database或者table的相关数据。
+    protected boolean debug = false;//开启debug,会把每条消息的详情打印
+
+    //1:retry,重试,重试默认为3次,由retryTimes参数决定,如果重试次数达到阈值,则跳过,并且记录日志。
+    //2:ignore,直接忽略,不重试,记录日志。
+    protected int exceptionStrategy = 1;
+    protected int retryTimes = 3;
+    protected int waitingTime = 100;//当binlog没有数据时,主线程等待的时间,单位ms,大于0
+
+
+    protected CanalConnector connector;
+    protected Thread thread;
+
+    protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+
+        public void uncaughtException(Thread t, Throwable e) {
+            logger.error("process message has an error", e);
+        }
+    };
+
+    @Override
+    public void afterPropertiesSet() {
+        if (waitingTime <= 0) {
+            throw new IllegalArgumentException("waitingTime must be greater than 0");
+        }
+        if (ExceptionStrategy.codeOf(exceptionStrategy) == null) {
+            throw new IllegalArgumentException("exceptionStrategy is not valid,1 or 2");
+        }
+        start();
+    }
+
+    @Override
+    public void start() {
+        if (running) {
+            return;
+        }
+        super.start();
+        initConnector();
+
+        thread = new Thread(new Runnable() {
+
+            public void run() {
+                process();
+            }
+        });
+
+        thread.setUncaughtExceptionHandler(handler);
+        thread.start();
+    }
+
+    @Override
+    public void stop() {
+        if (!running) {
+            return;
+        }
+        super.stop();
+        quietlyStop(thread);
+    }
+
+    protected void quietlyStop(Thread task) {
+        if (task != null) {
+            task.interrupt();
+            try {
+                task.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    public void process() {
+        int times = 0;
+        while (running) {
+            try {
+                sleepWhenFailed(times);
+                //after block, should check the status of thread.
+                if (!running) {
+                    break;
+                }
+                MDC.put("destination", destination);
+                connector.connect();
+                connector.subscribe(filter);
+                connector.rollback();
+                times = 0;//reset;
+
+                while (running) {
+                    // 获取指定数量的数据,不确认
+                    Message message = connector.getWithoutAck(batchSize);
+
+                    long batchId = message.getId();
+                    int size = message.getEntries().size();
+
+                    if (batchId == -1 || size == 0) {
+                        try {
+                            Thread.sleep(waitingTime);
+                        } catch (InterruptedException e) {
+                            //
+                        }
+                        continue;
+                    }
+                    //logger
+                    printBatch(message, batchId);
+
+                    processMessage(message);
+
+                }
+            } catch (Exception e) {
+                logger.error("process error!", e);
+                if (times > 20) {
+                    times = 0;
+                }
+                times++;
+            } finally {
+                connector.disconnect();
+                MDC.remove("destination");
+            }
+        }
+    }
+
+    protected abstract void processMessage(Message message);
+
+
+    private void initConnector() {
+        if (zkServers != null && zkServers.length() > 0) {
+            connector = CanalConnectors.newClusterConnector(zkServers, destination, username, password);
+        } else if (address != null) {
+            String[] segments = address.split(":");
+            SocketAddress socketAddress = new InetSocketAddress(segments[0], Integer.valueOf(segments[1]));
+            connector = CanalConnectors.newSingleConnector(socketAddress, destination, username, password);
+        } else {
+            throw new IllegalArgumentException("zkServers or address cant be null at same time,you should specify one of them!");
+        }
+
+    }
+
+    /**
+     * 用于控制当连接异常时,重试的策略,我们不应该每次都是立即重试,否则将可能导致大量的错误,在空转时导致CPU过高的问题
+     * sleep策略基于简单的累加
+     *
+     * @param times
+     */
+    private void sleepWhenFailed(int times) {
+        if (times <= 0) {
+            return;
+        }
+        try {
+            int sleepTime = 1000 + times * 100;//最大sleep 3s。
+            Thread.sleep(sleepTime);
+        } catch (Exception ex) {
+            //
+        }
+    }
+
+    /**
+     * 打印当前batch的摘要信息
+     *
+     * @param message
+     * @param batchId
+     */
+    protected void printBatch(Message message, long batchId) {
+        if (!debug) {
+            return;
+        }
+        List<CanalEntry.Entry> entries = message.getEntries();
+        if (CollectionUtils.isEmpty(entries)) {
+            return;
+        }
+
+        long memSize = 0;
+        for (CanalEntry.Entry entry : entries) {
+            memSize += entry.getHeader().getEventLength();
+        }
+        int size = entries.size();
+        String startPosition = buildPosition(entries.get(0));
+        String endPosition = buildPosition(message.getEntries().get(size - 1));
+
+        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
+        logger.info(contextFormat, new Object[]{batchId, size, memSize, format.format(new Date()), startPosition, endPosition});
+    }
+
+    protected String buildPosition(CanalEntry.Entry entry) {
+        CanalEntry.Header header = entry.getHeader();
+        long time = header.getExecuteTime();
+        Date date = new Date(time);
+        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
+        StringBuilder sb = new StringBuilder();
+        sb.append(header.getLogfileName())
+                .append(":")
+                .append(header.getLogfileOffset())
+                .append(":")
+                .append(header.getExecuteTime())
+                .append("(")
+                .append(format.format(date))
+                .append(")");
+        return sb.toString();
+    }
+
+    /**
+     * default,only logging information
+     *
+     * @param entry
+     */
+    protected void transactionBegin(CanalEntry.Entry entry) {
+        if (!debug) {
+            return;
+        }
+        try {
+            CanalEntry.TransactionBegin begin = CanalEntry.TransactionBegin.parseFrom(entry.getStoreValue());
+            // 打印事务头信息,执行的线程id,事务耗时
+            CanalEntry.Header header = entry.getHeader();
+            long executeTime = header.getExecuteTime();
+            long delayTime = new Date().getTime() - executeTime;
+            logger.info(transactionFormat,
+                    new Object[]{"begin", begin.getTransactionId(), header.getLogfileName(),
+                            String.valueOf(header.getLogfileOffset()),
+                            String.valueOf(header.getExecuteTime()), String.valueOf(delayTime)});
+        } catch (Exception e) {
+            logger.error("parse event has an error , data:" + entry.toString(), e);
+        }
+    }
+
+    protected void transactionEnd(CanalEntry.Entry entry) {
+        if (!debug) {
+            return;
+        }
+        try {
+            CanalEntry.TransactionEnd end = CanalEntry.TransactionEnd.parseFrom(entry.getStoreValue());
+            // 打印事务提交信息,事务id
+            CanalEntry.Header header = entry.getHeader();
+            long executeTime = header.getExecuteTime();
+            long delayTime = new Date().getTime() - executeTime;
+
+            logger.info(transactionFormat,
+                    new Object[]{"end", end.getTransactionId(), header.getLogfileName(),
+                            String.valueOf(header.getLogfileOffset()),
+                            String.valueOf(header.getExecuteTime()), String.valueOf(delayTime)});
+        } catch (Exception e) {
+            logger.error("parse event has an error , data:" + entry.toString(), e);
+        }
+    }
+
+    /**
+     * 判断事件类型为DML 数据
+     *
+     * @param eventType
+     * @return
+     */
+    protected boolean isDML(CanalEntry.EventType eventType) {
+        switch (eventType) {
+            case INSERT:
+            case UPDATE:
+            case DELETE:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    /**
+     * 处理 DDL数据
+     *
+     * @param header
+     * @param eventType
+     * @param sql
+     */
+
+    protected void processDDL(CanalEntry.Header header, CanalEntry.EventType eventType, String sql) {
+        if (!debug) {
+            return;
+        }
+        String table = header.getSchemaName() + "." + header.getTableName();
+        //对于DDL,直接执行,因为没有行变更数据
+        switch (eventType) {
+            case CREATE:
+                logger.warn("parse create table event, table: {}, sql: {}", table, sql);
+                return;
+            case ALTER:
+                logger.warn("parse alter table event, table: {}, sql: {}", table, sql);
+                return;
+            case TRUNCATE:
+                logger.warn("parse truncate table event, table: {}, sql: {}", table, sql);
+                return;
+            case ERASE:
+            case QUERY:
+                logger.warn("parse event : {}, sql: {} . ignored!", eventType.name(), sql);
+                return;
+            case RENAME:
+                logger.warn("parse rename table event, table: {}, sql: {}", table, sql);
+                return;
+            case CINDEX:
+                logger.warn("parse create index event, table: {}, sql: {}", table, sql);
+                return;
+            case DINDEX:
+                logger.warn("parse delete index event, table: {}, sql: {}", table, sql);
+                return;
+            default:
+                logger.warn("parse unknown event: {}, table: {}, sql: {}", new String[]{eventType.name(), table, sql});
+                break;
+        }
+    }
+
+    /**
+     * 强烈建议捕获异常,非上述已列出的其他操作,非核心
+     * 除了“insert”、“update”、“delete”操作之外的,其他类型的操作.
+     * 默认实现为“无操作”
+     *
+     * @param header 可以从header中获得schema、table的名称
+     * @param sql
+     */
+    public void whenOthers(CanalEntry.Header header, String sql) {
+        String schema = header.getSchemaName();
+        String table = header.getTableName();
+        logger.error("ignore event,schema: {},table: {},SQL: {}", new String[]{schema, table, sql});
+    }
+
+    public enum ExceptionStrategy {
+        RETRY(1), IGNORE(2);
+        public int code;
+
+        ExceptionStrategy(int code) {
+            this.code = code;
+        }
+
+        public static ExceptionStrategy codeOf(Integer code) {
+            if (code != null) {
+                for (ExceptionStrategy e : ExceptionStrategy.values()) {
+                    if (e.code == code) {
+                        return e;
+                    }
+                }
+            }
+            return null;
+        }
+    }
+
+    public String getZkServers() {
+        return zkServers;
+    }
+
+    public void setZkServers(String zkServers) {
+        this.zkServers = zkServers;
+    }
+
+    public String getAddress() {
+        return address;
+    }
+
+    public void setAddress(String address) {
+        this.address = address;
+    }
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public String getFilter() {
+        return filter;
+    }
+
+    public void setFilter(String filter) {
+        this.filter = filter;
+    }
+
+    public boolean isDebug() {
+        return debug;
+    }
+
+    public void setDebug(boolean debug) {
+        this.debug = debug;
+    }
+
+    public int getExceptionStrategy() {
+        return exceptionStrategy;
+    }
+
+    public void setExceptionStrategy(int exceptionStrategy) {
+        this.exceptionStrategy = exceptionStrategy;
+    }
+
+    public int getRetryTimes() {
+        return retryTimes;
+    }
+
+    public void setRetryTimes(int retryTimes) {
+        this.retryTimes = retryTimes;
+    }
+
+    public int getWaitingTime() {
+        return waitingTime;
+    }
+
+    public void setWaitingTime(int waitingTime) {
+        this.waitingTime = waitingTime;
+    }
+}

+ 35 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/MysqlLoadLauncher.java

@@ -0,0 +1,35 @@
+package com.alibaba.otter.canal.example.db;
+
+import com.alibaba.otter.canal.example.db.mysql.MysqlClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MysqlLoadLauncher {
+    private static final Logger logger = LoggerFactory.getLogger(MysqlLoadLauncher.class);
+
+    public static void main(String[] args) {
+        try {
+            logger.info("## start the canal mysql client.");
+            final MysqlClient client = ServiceLocator.getMysqlClient();
+            logger.info("## the canal consumer is running now ......");
+            client.start();
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+                public void run() {
+                    try {
+                        logger.info("## stop the canal consumer");
+                        client.stop();
+                    } catch (Throwable e) {
+                        logger.warn("##something goes wrong when stopping canal consumer:\n{}", e);
+                    } finally {
+                        logger.info("## canal consumer is down.");
+                    }
+                }
+
+            });
+        } catch (Throwable e) {
+            logger.error("## Something goes wrong when starting up the canal consumer:\n{}", e);
+            System.exit(0);
+        }
+    }
+}

+ 169 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/PropertyPlaceholderConfigurer.java

@@ -0,0 +1,169 @@
+package com.alibaba.otter.canal.example.db;
+
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.ResourceLoaderAware;
+import org.springframework.core.io.Resource;
+import org.springframework.core.io.ResourceLoader;
+import org.springframework.util.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * 扩展Spring的
+ * {@linkplain org.springframework.beans.factory.config.PropertyPlaceholderConfigurer}
+ * ,增加默认值的功能。 例如:${placeholder:defaultValue},假如placeholder的值不存在,则默认取得
+ * defaultValue。
+ * 
+ * @author jianghang 2013-1-24 下午03:37:56
+ * @version 1.0.0
+ */
+public class PropertyPlaceholderConfigurer extends org.springframework.beans.factory.config.PropertyPlaceholderConfigurer implements ResourceLoaderAware, InitializingBean {
+
+    private static final String PLACEHOLDER_PREFIX = "${";
+    private static final String PLACEHOLDER_SUFFIX = "}";
+    private ResourceLoader      loader;
+    private String[]            locationNames;
+
+    public PropertyPlaceholderConfigurer(){
+        setIgnoreUnresolvablePlaceholders(true);
+    }
+
+    public void setResourceLoader(ResourceLoader loader) {
+        this.loader = loader;
+    }
+
+    public void setLocationNames(String[] locations) {
+        this.locationNames = locations;
+    }
+
+    public void afterPropertiesSet() throws Exception {
+        Assert.notNull(loader, "no resourceLoader");
+
+        if (locationNames != null) {
+            for (int i = 0; i < locationNames.length; i++) {
+                locationNames[i] = resolveSystemPropertyPlaceholders(locationNames[i]);
+            }
+        }
+
+        if (locationNames != null) {
+            List<Resource> resources = new ArrayList<Resource>(locationNames.length);
+
+            for (String location : locationNames) {
+                location = trimToNull(location);
+
+                if (location != null) {
+                    resources.add(loader.getResource(location));
+                }
+            }
+
+            super.setLocations(resources.toArray(new Resource[resources.size()]));
+        }
+    }
+
+    private String resolveSystemPropertyPlaceholders(String text) {
+        StringBuilder buf = new StringBuilder(text);
+
+        for (int startIndex = buf.indexOf(PLACEHOLDER_PREFIX); startIndex >= 0;) {
+            int endIndex = buf.indexOf(PLACEHOLDER_SUFFIX, startIndex + PLACEHOLDER_PREFIX.length());
+
+            if (endIndex != -1) {
+                String placeholder = buf.substring(startIndex + PLACEHOLDER_PREFIX.length(), endIndex);
+                int nextIndex = endIndex + PLACEHOLDER_SUFFIX.length();
+
+                try {
+                    String value = resolveSystemPropertyPlaceholder(placeholder);
+
+                    if (value != null) {
+                        buf.replace(startIndex, endIndex + PLACEHOLDER_SUFFIX.length(), value);
+                        nextIndex = startIndex + value.length();
+                    } else {
+                        System.err.println("Could not resolve placeholder '"
+                                           + placeholder
+                                           + "' in ["
+                                           + text
+                                           + "] as system property: neither system property nor environment variable found");
+                    }
+                } catch (Throwable ex) {
+                    System.err.println("Could not resolve placeholder '" + placeholder + "' in [" + text
+                                       + "] as system property: " + ex);
+                }
+
+                startIndex = buf.indexOf(PLACEHOLDER_PREFIX, nextIndex);
+            } else {
+                startIndex = -1;
+            }
+        }
+
+        return buf.toString();
+    }
+
+    private String resolveSystemPropertyPlaceholder(String placeholder) {
+        DefaultablePlaceholder dp = new DefaultablePlaceholder(placeholder);
+        String value = System.getProperty(dp.placeholder);
+
+        if (value == null) {
+            value = System.getenv(dp.placeholder);
+        }
+
+        if (value == null) {
+            value = dp.defaultValue;
+        }
+
+        return value;
+    }
+
+    @Override
+    protected String resolvePlaceholder(String placeholder, Properties props, int systemPropertiesMode) {
+        DefaultablePlaceholder dp = new DefaultablePlaceholder(placeholder);
+        String value = super.resolvePlaceholder(dp.placeholder, props, systemPropertiesMode);
+
+        if (value == null) {
+            value = dp.defaultValue;
+        }
+
+        return trimToEmpty(value);
+    }
+
+    private static class DefaultablePlaceholder {
+
+        private final String defaultValue;
+        private final String placeholder;
+
+        public DefaultablePlaceholder(String placeholder){
+            int commaIndex = placeholder.indexOf(":");
+            String defaultValue = null;
+
+            if (commaIndex >= 0) {
+                defaultValue = trimToEmpty(placeholder.substring(commaIndex + 1));
+                placeholder = trimToEmpty(placeholder.substring(0, commaIndex));
+            }
+
+            this.placeholder = placeholder;
+            this.defaultValue = defaultValue;
+        }
+    }
+
+    private String trimToNull(String str) {
+        if (str == null) {
+            return null;
+        }
+
+        String result = str.trim();
+
+        if (result == null || result.length() == 0) {
+            return null;
+        }
+
+        return result;
+    }
+
+    public static String trimToEmpty(String str) {
+        if (str == null) {
+            return "";
+        }
+
+        return str.trim();
+    }
+}

+ 44 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/ServiceLocator.java

@@ -0,0 +1,44 @@
+package com.alibaba.otter.canal.example.db;
+
+import com.alibaba.otter.canal.example.db.mysql.MysqlClient;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.util.Assert;
+
+public class ServiceLocator implements DisposableBean {
+
+    private static ApplicationContext applicationContext = null;
+
+    static {
+        try {
+            applicationContext = new ClassPathXmlApplicationContext("classpath:client-spring.xml");
+        } catch (RuntimeException e) {
+            throw e;
+        }
+    }
+
+    private static <T> T getBean(String name) {
+        assertContextInjected();
+        return (T) applicationContext.getBean(name);
+    }
+
+
+    private static void clearHolder() {
+        ServiceLocator.applicationContext = null;
+    }
+
+    @Override
+    public void destroy() throws Exception {
+        ServiceLocator.clearHolder();
+    }
+
+    private static void assertContextInjected() {
+        Assert.state(applicationContext != null, "ApplicationContext not set");
+    }
+
+
+    public static MysqlClient getMysqlClient() {
+        return getBean("mysqlClient");
+    }
+}

+ 121 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/dialect/AbstractDbDialect.java

@@ -0,0 +1,121 @@
+/*
+ * Copyright (C) 2010-2101 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.otter.canal.example.db.dialect;
+
+import com.alibaba.otter.canal.example.db.utils.DdlUtils;
+import com.google.common.base.Function;
+import com.google.common.collect.MigrateMap;
+import org.apache.commons.lang.exception.NestableRuntimeException;
+import org.apache.ddlutils.model.Table;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.jdbc.support.lob.LobHandler;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.support.TransactionTemplate;
+import org.springframework.util.Assert;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractDbDialect implements DbDialect {
+
+    protected int databaseMajorVersion;
+    protected int databaseMinorVersion;
+    protected String databaseName;
+    protected JdbcTemplate jdbcTemplate;
+    protected TransactionTemplate transactionTemplate;
+    protected LobHandler lobHandler;
+    protected Map<List<String>, Table> tables;
+
+    public AbstractDbDialect(final JdbcTemplate jdbcTemplate, LobHandler lobHandler) {
+        this.jdbcTemplate = jdbcTemplate;
+        this.lobHandler = lobHandler;
+        // 初始化transction
+        this.transactionTemplate = new TransactionTemplate();
+        transactionTemplate.setTransactionManager(new DataSourceTransactionManager(jdbcTemplate.getDataSource()));
+        transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
+
+        // 初始化一些数据
+        jdbcTemplate.execute(new ConnectionCallback() {
+
+            public Object doInConnection(Connection c) throws SQLException, DataAccessException {
+                DatabaseMetaData meta = c.getMetaData();
+                databaseName = meta.getDatabaseProductName();
+                databaseMajorVersion = meta.getDatabaseMajorVersion();
+                databaseMinorVersion = meta.getDatabaseMinorVersion();
+
+                return null;
+            }
+        });
+
+        initTables(jdbcTemplate);
+    }
+
+    public Table findTable(String schema, String table, boolean useCache) {
+        List<String> key = Arrays.asList(schema, table);
+        if (useCache == false) {
+            tables.remove(key);
+        }
+
+        return tables.get(key);
+    }
+
+    public Table findTable(String schema, String table) {
+        return findTable(schema, table, true);
+    }
+
+    public LobHandler getLobHandler() {
+        return lobHandler;
+    }
+
+    public JdbcTemplate getJdbcTemplate() {
+        return jdbcTemplate;
+    }
+
+    public TransactionTemplate getTransactionTemplate() {
+        return transactionTemplate;
+    }
+
+    private void initTables(final JdbcTemplate jdbcTemplate) {
+        this.tables = MigrateMap.makeComputingMap(new Function<List<String>, Table>() {
+
+            public Table apply(List<String> names) {
+                Assert.isTrue(names.size() == 2);
+                try {
+                    Table table = DdlUtils.findTable(jdbcTemplate, names.get(0), names.get(0), names.get(1));
+                    if (table == null) {
+                        throw new NestableRuntimeException("no found table [" + names.get(0) + "." + names.get(1)
+                                + "] , pls check");
+                    } else {
+                        return table;
+                    }
+                } catch (Exception e) {
+                    throw new NestableRuntimeException("find table [" + names.get(0) + "." + names.get(1) + "] error",
+                            e);
+                }
+            }
+        });
+    }
+
+
+}

+ 105 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/dialect/AbstractSqlTemplate.java

@@ -0,0 +1,105 @@
+/*
+ * Copyright (C) 2010-2101 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.otter.canal.example.db.dialect;
+
+/**
+ * 默认的基于标准SQL实现的CRUD sql封装
+ * 
+ * @author jianghang 2011-10-27 下午01:37:00
+ * @version 4.0.0
+ */
+public abstract class AbstractSqlTemplate implements SqlTemplate {
+
+    private static final String DOT = ".";
+
+    public String getSelectSql(String schemaName, String tableName, String[] pkNames, String[] columnNames) {
+        StringBuilder sql = new StringBuilder("select ");
+        int size = columnNames.length;
+        for (int i = 0; i < size; i++) {
+            sql.append(appendEscape(columnNames[i])).append((i + 1 < size) ? " , " : "");
+        }
+
+        sql.append(" from ").append(getFullName(schemaName, tableName)).append(" where ( ");
+        appendColumnEquals(sql, pkNames, "and");
+        sql.append(" ) ");
+        return sql.toString().intern();// 不使用intern,避免方法区内存消耗过多
+    }
+
+    public String getUpdateSql(String schemaName, String tableName, String[] pkNames, String[] columnNames) {
+        StringBuilder sql = new StringBuilder("update " + getFullName(schemaName, tableName) + " set ");
+        appendColumnEquals(sql, columnNames, ",");
+        sql.append(" where (");
+        appendColumnEquals(sql, pkNames, "and");
+        sql.append(")");
+        return sql.toString().intern(); // 不使用intern,避免方法区内存消耗过多
+    }
+
+    public String getInsertSql(String schemaName, String tableName, String[] pkNames, String[] columnNames) {
+        StringBuilder sql = new StringBuilder("insert into " + getFullName(schemaName, tableName) + "(");
+        String[] allColumns = new String[pkNames.length + columnNames.length];
+        System.arraycopy(columnNames, 0, allColumns, 0, columnNames.length);
+        System.arraycopy(pkNames, 0, allColumns, columnNames.length, pkNames.length);
+
+        int size = allColumns.length;
+        for (int i = 0; i < size; i++) {
+            sql.append(appendEscape(allColumns[i])).append((i + 1 < size) ? "," : "");
+        }
+
+        sql.append(") values (");
+        appendColumnQuestions(sql, allColumns);
+        sql.append(")");
+        return sql.toString().intern();// intern优化,避免出现大量相同的字符串
+    }
+
+    public String getDeleteSql(String schemaName, String tableName, String[] pkNames) {
+        StringBuilder sql = new StringBuilder("delete from " + getFullName(schemaName, tableName) + " where ");
+        appendColumnEquals(sql, pkNames, "and");
+        return sql.toString().intern();// intern优化,避免出现大量相同的字符串
+    }
+
+    protected String getFullName(String schemaName, String tableName) {
+        StringBuilder sb = new StringBuilder();
+        if (schemaName != null) {
+            sb.append(appendEscape(schemaName)).append(DOT);
+        }
+        sb.append(appendEscape(tableName));
+        return sb.toString().intern();
+    }
+
+    // ================ helper method ============
+
+    protected String appendEscape(String columnName) {
+        return columnName;
+    }
+
+    protected void appendColumnQuestions(StringBuilder sql, String[] columns) {
+        int size = columns.length;
+        for (int i = 0; i < size; i++) {
+            sql.append("?").append((i + 1 < size) ? " , " : "");
+        }
+    }
+
+    protected void appendColumnEquals(StringBuilder sql, String[] columns, String separator) {
+        int size = columns.length;
+        for (int i = 0; i < size; i++) {
+            sql.append(" ").append(appendEscape(columns[i])).append(" = ").append("? ");
+            if (i != size - 1) {
+                sql.append(separator);
+            }
+        }
+    }
+}

+ 20 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/dialect/DbDialect.java

@@ -0,0 +1,20 @@
+package com.alibaba.otter.canal.example.db.dialect;
+
+import org.apache.ddlutils.model.Table;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.support.lob.LobHandler;
+import org.springframework.transaction.support.TransactionTemplate;
+
+public interface DbDialect {
+
+    LobHandler getLobHandler();
+
+    JdbcTemplate getJdbcTemplate();
+
+    TransactionTemplate getTransactionTemplate();
+
+    Table findTable(String schema, String table);
+
+    Table findTable(String schema, String table, boolean useCache);
+
+}

+ 40 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/dialect/SqlTemplate.java

@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2010-2101 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.otter.canal.example.db.dialect;
+
+/**
+ * sql构造模板操作
+ * 
+ * @author jianghang 2011-10-27 下午01:31:15
+ * @version 4.0.0
+ */
+public interface SqlTemplate {
+
+    public String getSelectSql(String schemaName, String tableName, String[] pkNames, String[] columnNames);
+
+    public String getUpdateSql(String schemaName, String tableName, String[] pkNames, String[] columnNames);
+
+    public String getDeleteSql(String schemaName, String tableName, String[] pkNames);
+
+    public String getInsertSql(String schemaName, String tableName, String[] pkNames, String[] columnNames);
+
+    /**
+     * 获取对应的mergeSql
+     */
+    public String getMergeSql(String schemaName, String tableName, String[] pkNames, String[] columnNames,
+                              String[] viewColumnNames, boolean updatePks);
+}

+ 93 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/dialect/TableType.java

@@ -0,0 +1,93 @@
+package com.alibaba.otter.canal.example.db.dialect;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * An enumeration wrapper around JDBC table types.
+ */
+public enum TableType {
+
+    /**
+     * Unknown
+     */
+    unknown,
+
+    /**
+     * System table
+     */
+    system_table,
+
+    /**
+     * Global temporary
+     */
+    global_temporary,
+
+    /**
+     * Local temporary
+     */
+    local_temporary,
+
+    /**
+     * Table
+     */
+    table,
+
+    /**
+     * View
+     */
+    view,
+
+    /**
+     * Alias
+     */
+    alias,
+
+    /**
+     * Synonym
+     */
+    synonym,;
+
+    /**
+     * Converts an array of table types to an array of their corresponding string values.
+     *
+     * @param tableTypes Array of table types
+     * @return Array of string table types
+     */
+    public static String[] toStrings(final TableType[] tableTypes) {
+        if ((tableTypes == null) || (tableTypes.length == 0)) {
+            return new String[0];
+        }
+
+        final List<String> tableTypeStrings = new ArrayList<String>(tableTypes.length);
+
+        for (final TableType tableType : tableTypes) {
+            if (tableType != null) {
+                tableTypeStrings.add(tableType.toString().toUpperCase(Locale.ENGLISH));
+            }
+        }
+
+        return tableTypeStrings.toArray(new String[tableTypeStrings.size()]);
+    }
+
+    /**
+     * Converts an array of string table types to an array of their corresponding enumeration values.
+     *
+     * @param tableTypeStrings Array of string table types
+     * @return Array of table types
+     */
+    public static TableType[] valueOf(final String[] tableTypeStrings) {
+        if ((tableTypeStrings == null) || (tableTypeStrings.length == 0)) {
+            return new TableType[0];
+        }
+
+        final List<TableType> tableTypes = new ArrayList<TableType>(tableTypeStrings.length);
+
+        for (final String tableTypeString : tableTypeStrings) {
+            tableTypes.add(valueOf(tableTypeString.toLowerCase(Locale.ENGLISH)));
+        }
+
+        return tableTypes.toArray(new TableType[tableTypes.size()]);
+    }
+}

+ 32 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/dialect/mysql/MysqlDialect.java

@@ -0,0 +1,32 @@
+/*
+ * Copyright (C) 2010-2101 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.otter.canal.example.db.dialect.mysql;
+
+import com.alibaba.otter.canal.example.db.dialect.AbstractDbDialect;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.support.lob.LobHandler;
+
+public class MysqlDialect extends AbstractDbDialect {
+
+    public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler) {
+        super(jdbcTemplate, lobHandler);
+    }
+
+    public boolean isEmptyStringNulled() {
+        return false;
+    }
+}

+ 84 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/dialect/mysql/MysqlSqlTemplate.java

@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2010-2101 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.otter.canal.example.db.dialect.mysql;
+
+import com.alibaba.otter.canal.example.db.dialect.AbstractSqlTemplate;
+
+/**
+ * mysql sql生成模板
+ *
+ * @author jianghang 2011-10-27 下午01:41:20
+ * @version 4.0.0
+ */
+public class MysqlSqlTemplate extends AbstractSqlTemplate {
+
+    private static final String ESCAPE = "`";
+
+    public String getMergeSql(String schemaName, String tableName, String[] pkNames, String[] columnNames,
+                              String[] viewColumnNames, boolean includePks) {
+        StringBuilder sql = new StringBuilder("insert into " + getFullName(schemaName, tableName) + "(");
+        int size = columnNames.length;
+        for (int i = 0; i < size; i++) {
+            sql.append(appendEscape(columnNames[i])).append(" , ");
+        }
+        size = pkNames.length;
+        for (int i = 0; i < size; i++) {
+            sql.append(appendEscape(pkNames[i])).append((i + 1 < size) ? " , " : "");
+        }
+
+        sql.append(") values (");
+        size = columnNames.length;
+        for (int i = 0; i < size; i++) {
+            sql.append("?").append(" , ");
+        }
+        size = pkNames.length;
+        for (int i = 0; i < size; i++) {
+            sql.append("?").append((i + 1 < size) ? " , " : "");
+        }
+        sql.append(")");
+        sql.append(" on duplicate key update ");
+
+        size = columnNames.length;
+        for (int i = 0; i < size; i++) {
+            sql.append(appendEscape(columnNames[i]))
+                    .append("=values(")
+                    .append(appendEscape(columnNames[i]))
+                    .append(")");
+            if (includePks) {
+                sql.append(" , ");
+            } else {
+                sql.append((i + 1 < size) ? " , " : "");
+            }
+        }
+
+        if (includePks) {
+            // mysql merge sql匹配了uniqe / primary key时都会执行update,所以需要更新pk信息
+            size = pkNames.length;
+            for (int i = 0; i < size; i++) {
+                sql.append(appendEscape(pkNames[i])).append("=values(").append(appendEscape(pkNames[i])).append(")");
+                sql.append((i + 1 < size) ? " , " : "");
+            }
+        }
+
+        return sql.toString().intern();// intern优化,避免出现大量相同的字符串
+    }
+
+    protected String appendEscape(String columnName) {
+        return ESCAPE + columnName + ESCAPE;
+    }
+
+}

+ 207 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/mysql/AbstractMysqlClient.java

@@ -0,0 +1,207 @@
+package com.alibaba.otter.canal.example.db.mysql;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.example.db.AbstractDbClient;
+import com.alibaba.otter.canal.example.db.dialect.DbDialect;
+import com.alibaba.otter.canal.example.db.dialect.mysql.MysqlDialect;
+import com.alibaba.otter.canal.example.db.dialect.mysql.MysqlSqlTemplate;
+import com.alibaba.otter.canal.example.db.dialect.SqlTemplate;
+import com.alibaba.otter.canal.example.db.utils.SqlUtils;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.ddlutils.model.Column;
+import org.apache.ddlutils.model.Table;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.PreparedStatementSetter;
+import org.springframework.jdbc.core.StatementCreatorUtils;
+import org.springframework.jdbc.support.lob.DefaultLobHandler;
+import org.springframework.jdbc.support.lob.LobCreator;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+
+import javax.sql.DataSource;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public abstract class AbstractMysqlClient extends AbstractDbClient {
+
+    private DataSource dataSource;
+
+    private DbDialect dbDialect;
+    private SqlTemplate sqlTemplate;
+
+    protected Integer execute(final CanalEntry.Header header, final List<CanalEntry.Column> columns) {
+        final String sql = getSql(header, columns);
+        final LobCreator lobCreator = dbDialect.getLobHandler().getLobCreator();
+        dbDialect.getTransactionTemplate().execute(new TransactionCallback() {
+
+            public Object doInTransaction(TransactionStatus status) {
+                try {
+                    JdbcTemplate template = dbDialect.getJdbcTemplate();
+                    int affect = template.update(sql, new PreparedStatementSetter() {
+
+                        public void setValues(PreparedStatement ps) throws SQLException {
+                            doPreparedStatement(ps, dbDialect, lobCreator, header, columns);
+                        }
+                    });
+                    return affect;
+                } finally {
+                    lobCreator.close();
+                }
+            }
+        });
+        return 0;
+    }
+
+    private String getSql(CanalEntry.Header header, List<CanalEntry.Column> columns) {
+        List<String> pkNames = new ArrayList<>();
+        List<String> colNames = new ArrayList<>();
+        for (CanalEntry.Column column : columns) {
+            if (column.getIsKey()) {
+                pkNames.add(column.getName());
+            } else {
+                colNames.add(column.getName());
+            }
+        }
+        String sql = "";
+        CanalEntry.EventType eventType = header.getEventType();
+        switch (eventType) {
+            case INSERT:
+                sql = sqlTemplate.getInsertSql(header.getSchemaName(), header.getTableName(), pkNames.toArray(new String[]{}), colNames.toArray(new String[]{}));
+                break;
+            case UPDATE:
+                sql = sqlTemplate.getUpdateSql(header.getSchemaName(), header.getTableName(), pkNames.toArray(new String[]{}), colNames.toArray(new String[]{}));
+                break;
+            case DELETE:
+                sql = sqlTemplate.getDeleteSql(header.getSchemaName(), header.getTableName(), pkNames.toArray(new String[]{}));
+        }
+        logger.info("Execute sql: {}", sql);
+        return sql;
+    }
+
+    private void doPreparedStatement(PreparedStatement ps, DbDialect dbDialect, LobCreator lobCreator,
+                                     CanalEntry.Header header, List<CanalEntry.Column> columns) throws SQLException {
+
+        List<CanalEntry.Column> rebuildColumns = new ArrayList<>(columns.size());
+
+        List<CanalEntry.Column> keyColumns = new ArrayList<>(columns.size());
+        List<CanalEntry.Column> notKeyColumns = new ArrayList<>(columns.size());
+        for (CanalEntry.Column column : columns) {
+            if (column.getIsKey()) {
+                keyColumns.add(column);
+            } else {
+                notKeyColumns.add(column);
+            }
+        }
+        CanalEntry.EventType eventType = header.getEventType();
+        switch (eventType) {
+            case INSERT:
+            case UPDATE:
+                // insert/update语句对应的字段数序都是将主键排在后面
+                rebuildColumns.addAll(notKeyColumns);
+                rebuildColumns.addAll(keyColumns);
+                break;
+            case DELETE:
+                rebuildColumns.addAll(keyColumns);
+        }
+
+        // 获取一下当前字段名的数据是否必填
+        Table table = dbDialect.findTable(header.getSchemaName(), header.getTableName());
+        Map<String, Boolean> isRequiredMap = new HashMap();
+        for (Column tableColumn : table.getColumns()) {
+            isRequiredMap.put(StringUtils.lowerCase(tableColumn.getName()), tableColumn.isRequired());
+        }
+
+        List<Object> values = new ArrayList<>(rebuildColumns.size());
+        for (int i = 0; i < rebuildColumns.size(); i++) {
+            int paramIndex = i + 1;
+            CanalEntry.Column column = rebuildColumns.get(i);
+            int sqlType = column.getSqlType();
+
+            Boolean isRequired = isRequiredMap.get(StringUtils.lowerCase(column.getName()));
+            if (isRequired == null) {
+                // 清理一下目标库的表结构,二次检查一下
+                table = dbDialect.findTable(header.getSchemaName(), header.getTableName());
+
+                isRequiredMap = new HashMap<>();
+                for (Column tableColumn : table.getColumns()) {
+                    isRequiredMap.put(StringUtils.lowerCase(tableColumn.getName()), tableColumn.isRequired());
+                }
+
+                isRequired = isRequiredMap.get(StringUtils.lowerCase(column.getName()));
+                if (isRequired == null) {
+                    throw new CanalClientException(String.format("column name %s is not found in Table[%s]",
+                            column.getName(),
+                            table.toString()));
+                }
+            }
+
+            Object param;
+            if (sqlType == Types.TIME || sqlType == Types.TIMESTAMP || sqlType == Types.DATE) {
+                // 解决mysql的0000-00-00 00:00:00问题,直接依赖mysql
+                // driver进行处理,如果转化为Timestamp会出错
+                param = column.getValue();
+                if (param instanceof String && StringUtils.isEmpty(String.valueOf(param))) {
+                    param = null;
+                }
+            } else {
+                param = SqlUtils.stringToSqlValue(column.getValue(),
+                        sqlType,
+                        isRequired,
+                        column.getIsNull());
+            }
+
+            try {
+                switch (sqlType) {
+                    case Types.CLOB:
+                        lobCreator.setClobAsString(ps, paramIndex, (String) param);
+                        break;
+                    case Types.BLOB:
+                        lobCreator.setBlobAsBytes(ps, paramIndex, (byte[]) param);
+                        break;
+                    case Types.TIME:
+                    case Types.TIMESTAMP:
+                    case Types.DATE:
+                        ps.setObject(paramIndex, param);
+                        break;
+                    case Types.BIT:
+                        StatementCreatorUtils.setParameterValue(ps, paramIndex, Types.DECIMAL, null, param);
+                        break;
+                    default:
+                        StatementCreatorUtils.setParameterValue(ps, paramIndex, sqlType, null, param);
+                        break;
+                }
+                values.add(param);
+            } catch (SQLException ex) {
+                logger.error("## SetParam error , [sqltype={}, value={}]",
+                        new Object[]{sqlType, param});
+                throw ex;
+            }
+        }
+        logger.info("## sql values: {}", JSON.toJSONString(values));
+    }
+
+    @Override
+    public void afterPropertiesSet() {
+        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
+        DefaultLobHandler lobHandler = new DefaultLobHandler();
+        lobHandler.setStreamAsLob(true);
+        dbDialect = new MysqlDialect(jdbcTemplate, lobHandler);
+        sqlTemplate = new MysqlSqlTemplate();
+    }
+
+    public DataSource getDataSource() {
+        return dataSource;
+    }
+
+    public void setDataSource(DataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+}

+ 23 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/mysql/MysqlClient.java

@@ -0,0 +1,23 @@
+package com.alibaba.otter.canal.example.db.mysql;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+
+import java.util.List;
+
+public class MysqlClient extends AbstractMysqlClient {
+
+    @Override
+    public void insert(CanalEntry.Header header, List<CanalEntry.Column> afterColumns) {
+        execute(header, afterColumns);
+    }
+
+    @Override
+    public void update(CanalEntry.Header header, List<CanalEntry.Column> afterColumns) {
+        execute(header, afterColumns);
+    }
+
+    @Override
+    public void delete(CanalEntry.Header header, List<CanalEntry.Column> beforeColumns) {
+        execute(header, beforeColumns);
+    }
+}

+ 50 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/utils/ByteArrayConverter.java

@@ -0,0 +1,50 @@
+package com.alibaba.otter.canal.example.db.utils;
+
+import org.apache.commons.beanutils.ConversionException;
+import org.apache.commons.beanutils.Converter;
+import org.apache.commons.beanutils.converters.ArrayConverter;
+import org.apache.commons.beanutils.converters.ByteConverter;
+
+public class ByteArrayConverter implements Converter {
+
+    public static final Converter SQL_BYTES = new ByteArrayConverter(null);
+    private static final Converter converter = new ArrayConverter(byte[].class, new ByteConverter());
+
+    protected final Object defaultValue;
+    protected final boolean useDefault;
+
+    public ByteArrayConverter() {
+        this.defaultValue = null;
+        this.useDefault = false;
+    }
+
+    public ByteArrayConverter(Object defaultValue) {
+        this.defaultValue = defaultValue;
+        this.useDefault = true;
+    }
+
+    public Object convert(Class type, Object value) {
+        if (value == null) {
+            if (useDefault) {
+                return (defaultValue);
+            } else {
+                throw new ConversionException("No value specified");
+            }
+        }
+
+        if (value instanceof byte[]) {
+            return (value);
+        }
+
+        // BLOB类型,canal直接存储为String("ISO-8859-1")
+        if (value instanceof String) {
+            try {
+                return ((String) value).getBytes("ISO-8859-1");
+            } catch (Exception e) {
+                throw new ConversionException(e);
+            }
+        }
+
+        return converter.convert(type, value); // byteConvertor进行转化
+    }
+}

+ 326 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/utils/DdlUtils.java

@@ -0,0 +1,326 @@
+package com.alibaba.otter.canal.example.db.utils;
+
+import com.alibaba.otter.canal.example.db.dialect.TableType;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.ddlutils.model.Column;
+import org.apache.ddlutils.model.Table;
+import org.apache.ddlutils.platform.DatabaseMetaDataWrapper;
+import org.apache.ddlutils.platform.MetaDataColumnDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.support.JdbcUtils;
+
+import java.sql.*;
+import java.util.*;
+
+
+public class DdlUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(DdlUtils.class);
+    private static TableType[] SUPPORTED_TABLE_TYPES = new TableType[]{TableType.view, TableType.table};
+    private final static Map<Integer, String> _defaultSizes = new HashMap<Integer, String>();
+
+    static {
+        _defaultSizes.put(new Integer(1), "254");
+        _defaultSizes.put(new Integer(12), "254");
+        _defaultSizes.put(new Integer(-1), "254");
+        _defaultSizes.put(new Integer(-2), "254");
+        _defaultSizes.put(new Integer(-3), "254");
+        _defaultSizes.put(new Integer(-4), "254");
+        _defaultSizes.put(new Integer(4), "32");
+        _defaultSizes.put(new Integer(-5), "64");
+        _defaultSizes.put(new Integer(7), "7,0");
+        _defaultSizes.put(new Integer(6), "15,0");
+        _defaultSizes.put(new Integer(8), "15,0");
+        _defaultSizes.put(new Integer(3), "15,15");
+        _defaultSizes.put(new Integer(2), "15,15");
+    }
+
+
+    public static Table findTable(final JdbcTemplate jdbcTemplate, final String catalogName, final String schemaName,
+                                  final String tableName) {
+        return (Table) jdbcTemplate.execute(new ConnectionCallback() {
+
+            public Object doInConnection(Connection con) throws SQLException, DataAccessException {
+                Table table = null;
+                DatabaseMetaDataWrapper metaData = new DatabaseMetaDataWrapper();
+                try {
+
+                    DatabaseMetaData databaseMetaData = con.getMetaData();
+
+                    metaData.setMetaData(databaseMetaData);
+                    metaData.setTableTypes(TableType.toStrings(SUPPORTED_TABLE_TYPES));
+                    metaData.setCatalog(catalogName);
+                    metaData.setSchemaPattern(schemaName);
+
+                    String convertTableName = tableName;
+                    if (databaseMetaData.storesUpperCaseIdentifiers()) {
+                        metaData.setCatalog(catalogName.toUpperCase());
+                        metaData.setSchemaPattern(schemaName.toUpperCase());
+                        convertTableName = tableName.toUpperCase();
+                    }
+                    if (databaseMetaData.storesLowerCaseIdentifiers()) {
+                        metaData.setCatalog(catalogName.toLowerCase());
+                        metaData.setSchemaPattern(schemaName.toLowerCase());
+                        convertTableName = tableName.toLowerCase();
+                    }
+
+                    ResultSet tableData = null;
+                    try {
+                        tableData = metaData.getTables(convertTableName);
+
+                        while ((tableData != null) && tableData.next()) {
+                            Map<String, Object> values = readColumns(tableData, initColumnsForTable());
+
+                            table = readTable(metaData, values);
+                            if (table.getName().equalsIgnoreCase(tableName)) {
+                                break;
+                            }
+                        }
+                    } finally {
+                        JdbcUtils.closeResultSet(tableData);
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+
+                makeAllColumnsPrimaryKeysIfNoPrimaryKeysFound(table);
+                return table;
+            }
+        });
+    }
+
+    /**
+     * Treat tables with no primary keys as a table with all primary keys.
+     */
+    private static void makeAllColumnsPrimaryKeysIfNoPrimaryKeysFound(Table table) {
+        if ((table != null) && (table.getPrimaryKeyColumns() != null) && (table.getPrimaryKeyColumns().length == 0)) {
+            Column[] allCoumns = table.getColumns();
+
+            for (Column column : allCoumns) {
+                column.setPrimaryKey(true);
+            }
+        }
+    }
+
+    private static Table readTable(DatabaseMetaDataWrapper metaData, Map<String, Object> values) throws SQLException {
+        String tableName = (String) values.get("TABLE_NAME");
+        Table table = null;
+
+        if ((tableName != null) && (tableName.length() > 0)) {
+            table = new Table();
+            table.setName(tableName);
+            table.setType((String) values.get("TABLE_TYPE"));
+            table.setCatalog((String) values.get("TABLE_CAT"));
+            table.setSchema((String) values.get("TABLE_SCHEM"));
+            table.setDescription((String) values.get("REMARKS"));
+            table.addColumns(readColumns(metaData, tableName));
+
+            Collection<String> primaryKeys = readPrimaryKeyNames(metaData, tableName);
+
+            for (Object key : primaryKeys) {
+                Column col = table.findColumn((String) key, true);
+
+                if (col != null) {
+                    col.setPrimaryKey(true);
+                } else {
+                    throw new NullPointerException(String.format("%s pk %s is null - %s %s",
+                            tableName,
+                            key,
+                            ToStringBuilder.reflectionToString(metaData, ToStringStyle.SIMPLE_STYLE),
+                            ToStringBuilder.reflectionToString(values, ToStringStyle.SIMPLE_STYLE)));
+                }
+            }
+        }
+
+        return table;
+    }
+
+    private static List<MetaDataColumnDescriptor> initColumnsForTable() {
+        List<MetaDataColumnDescriptor> result = new ArrayList<MetaDataColumnDescriptor>();
+
+        result.add(new MetaDataColumnDescriptor("TABLE_NAME", Types.VARCHAR));
+        result.add(new MetaDataColumnDescriptor("TABLE_TYPE", Types.VARCHAR, "UNKNOWN"));
+        result.add(new MetaDataColumnDescriptor("TABLE_CAT", Types.VARCHAR));
+        result.add(new MetaDataColumnDescriptor("TABLE_SCHEM", Types.VARCHAR));
+        result.add(new MetaDataColumnDescriptor("REMARKS", Types.VARCHAR));
+
+        return result;
+    }
+
+    private static List<MetaDataColumnDescriptor> initColumnsForColumn() {
+        List<MetaDataColumnDescriptor> result = new ArrayList<MetaDataColumnDescriptor>();
+
+        // As suggested by Alexandre Borgoltz, we're reading the COLUMN_DEF
+        // first because Oracle
+        // has problems otherwise (it seemingly requires a LONG column to be the
+        // first to be read)
+        // See also DDLUTILS-29
+        result.add(new MetaDataColumnDescriptor("COLUMN_DEF", Types.VARCHAR));
+
+        // we're also reading the table name so that a model reader impl can
+        // filter manually
+        result.add(new MetaDataColumnDescriptor("TABLE_NAME", Types.VARCHAR));
+        result.add(new MetaDataColumnDescriptor("COLUMN_NAME", Types.VARCHAR));
+        result.add(new MetaDataColumnDescriptor("TYPE_NAME", Types.VARCHAR));
+        result.add(new MetaDataColumnDescriptor("DATA_TYPE", Types.INTEGER, new Integer(Types.OTHER)));
+        result.add(new MetaDataColumnDescriptor("NUM_PREC_RADIX", Types.INTEGER, new Integer(10)));
+        result.add(new MetaDataColumnDescriptor("DECIMAL_DIGITS", Types.INTEGER, new Integer(0)));
+        result.add(new MetaDataColumnDescriptor("COLUMN_SIZE", Types.VARCHAR));
+        result.add(new MetaDataColumnDescriptor("IS_NULLABLE", Types.VARCHAR, "YES"));
+        result.add(new MetaDataColumnDescriptor("REMARKS", Types.VARCHAR));
+
+        return result;
+    }
+
+    private static List<MetaDataColumnDescriptor> initColumnsForPK() {
+        List<MetaDataColumnDescriptor> result = new ArrayList<MetaDataColumnDescriptor>();
+
+        result.add(new MetaDataColumnDescriptor("COLUMN_NAME", Types.VARCHAR));
+
+        // we're also reading the table name so that a model reader impl can
+        // filter manually
+        result.add(new MetaDataColumnDescriptor("TABLE_NAME", Types.VARCHAR));
+
+        // the name of the primary key is currently only interesting to the pk
+        // index name resolution
+        result.add(new MetaDataColumnDescriptor("PK_NAME", Types.VARCHAR));
+
+        return result;
+    }
+
+    private static List<Column> readColumns(DatabaseMetaDataWrapper metaData, String tableName) throws SQLException {
+        ResultSet columnData = null;
+
+        try {
+            columnData = metaData.getColumns(tableName, null);
+
+            List<Column> columns = new ArrayList<Column>();
+            Map<String, Object> values;
+
+            for (; columnData.next(); columns.add(readColumn(metaData, values))) {
+                Map<String, Object> tmp = readColumns(columnData, initColumnsForColumn());
+                if (tableName.equalsIgnoreCase((String) tmp.get("TABLE_NAME"))) {
+                    values = tmp;
+                } else {
+                    break;
+                }
+            }
+
+            return columns;
+        } finally {
+            JdbcUtils.closeResultSet(columnData);
+        }
+    }
+
+    private static Column readColumn(DatabaseMetaDataWrapper metaData, Map<String, Object> values) throws SQLException {
+        Column column = new Column();
+
+        column.setName((String) values.get("COLUMN_NAME"));
+        column.setDefaultValue((String) values.get("COLUMN_DEF"));
+        column.setTypeCode(((Integer) values.get("DATA_TYPE")).intValue());
+
+        String typeName = (String) values.get("TYPE_NAME");
+        // column.setType(typeName);
+
+        if ((typeName != null) && typeName.startsWith("TIMESTAMP")) {
+            column.setTypeCode(Types.TIMESTAMP);
+        }
+        // modify 2013-09-25,处理下unsigned
+        if ((typeName != null) && StringUtils.containsIgnoreCase(typeName, "UNSIGNED")) {
+            // 如果为unsigned,往上调大一个量级,避免数据溢出
+            switch (column.getTypeCode()) {
+                case Types.TINYINT:
+                    column.setTypeCode(Types.SMALLINT);
+                    break;
+                case Types.SMALLINT:
+                    column.setTypeCode(Types.INTEGER);
+                    break;
+                case Types.INTEGER:
+                    column.setTypeCode(Types.BIGINT);
+                    break;
+                case Types.BIGINT:
+                    column.setTypeCode(Types.DECIMAL);
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        Integer precision = (Integer) values.get("NUM_PREC_RADIX");
+
+        if (precision != null) {
+            column.setPrecisionRadix(precision.intValue());
+        }
+
+        String size = (String) values.get("COLUMN_SIZE");
+
+        if (size == null) {
+            size = (String) _defaultSizes.get(new Integer(column.getTypeCode()));
+        }
+
+        // we're setting the size after the precision and radix in case
+        // the database prefers to return them in the size value
+        column.setSize(size);
+
+        int scale = 0;
+        Object dec_digits = values.get("DECIMAL_DIGITS");
+
+        if (dec_digits instanceof String) {
+            scale = (dec_digits == null) ? 0 : NumberUtils.toInt(dec_digits.toString());
+        } else if (dec_digits instanceof Integer) {
+            scale = (dec_digits == null) ? 0 : (Integer) dec_digits;
+        }
+
+        if (scale != 0) {
+            column.setScale(scale);
+        }
+
+        column.setRequired("NO".equalsIgnoreCase(((String) values.get("IS_NULLABLE")).trim()));
+        column.setDescription((String) values.get("REMARKS"));
+        return column;
+    }
+
+    private static Map<String, Object> readColumns(ResultSet resultSet, List<MetaDataColumnDescriptor> columnDescriptors)
+            throws SQLException {
+        Map<String, Object> values = new HashMap<String, Object>();
+        MetaDataColumnDescriptor descriptor;
+
+        for (Iterator<MetaDataColumnDescriptor> it = columnDescriptors.iterator(); it.hasNext(); values.put(descriptor.getName(),
+                descriptor.readColumn(resultSet))) {
+            descriptor = (MetaDataColumnDescriptor) it.next();
+        }
+
+        return values;
+    }
+
+    private static Collection<String> readPrimaryKeyNames(DatabaseMetaDataWrapper metaData, String tableName)
+            throws SQLException {
+        ResultSet pkData = null;
+
+        try {
+            List<String> pks = new ArrayList<String>();
+            Map<String, Object> values;
+
+            for (pkData = metaData.getPrimaryKeys(tableName); pkData.next(); pks.add(readPrimaryKeyName(metaData,
+                    values))) {
+                values = readColumns(pkData, initColumnsForPK());
+            }
+
+            return pks;
+        } finally {
+            JdbcUtils.closeResultSet(pkData);
+        }
+    }
+
+    private static String readPrimaryKeyName(DatabaseMetaDataWrapper metaData, Map<String, Object> values)
+            throws SQLException {
+        return (String) values.get("COLUMN_NAME");
+    }
+}

+ 140 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/utils/SqlTimestampConverter.java

@@ -0,0 +1,140 @@
+package com.alibaba.otter.canal.example.db.utils;
+
+import org.apache.commons.beanutils.ConversionException;
+import org.apache.commons.beanutils.Converter;
+import org.apache.commons.lang.time.DateFormatUtils;
+
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.ParsePosition;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Locale;
+
+public class SqlTimestampConverter implements Converter {
+
+    /**
+     * Field description
+     */
+    public static final String[] DATE_FORMATS = new String[]{"yyyy-MM-dd", "HH:mm:ss", "yyyy-MM-dd HH:mm:ss",
+            "yyyy-MM-dd hh:mm:ss.fffffffff", "EEE MMM dd HH:mm:ss zzz yyyy",
+            DateFormatUtils.ISO_DATETIME_FORMAT.getPattern(),
+            DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT.getPattern(),
+            DateFormatUtils.SMTP_DATETIME_FORMAT.getPattern(),};
+
+    public static final Converter SQL_TIMESTAMP = new SqlTimestampConverter(null);
+
+    /**
+     * The default value specified to our Constructor, if any.
+     */
+    private final Object defaultValue;
+
+    /**
+     * Should we return the default value on conversion errors?
+     */
+    private final boolean useDefault;
+
+    /**
+     * Create a {@link Converter} that will throw a {@link ConversionException} if a conversion error occurs.
+     */
+    public SqlTimestampConverter() {
+        this.defaultValue = null;
+        this.useDefault = false;
+    }
+
+    /**
+     * Create a {@link Converter} that will return the specified default value if a conversion error occurs.
+     *
+     * @param defaultValue The default value to be returned
+     */
+    public SqlTimestampConverter(Object defaultValue) {
+        this.defaultValue = defaultValue;
+        this.useDefault = true;
+    }
+
+    /**
+     * Convert the specified input object into an output object of the specified type.
+     *
+     * @param type  Data type to which this value should be converted
+     * @param value The input value to be converted
+     * @throws ConversionException if conversion cannot be performed successfully
+     */
+    public Object convert(Class type, Object value) {
+        if (value == null) {
+            if (useDefault) {
+                return (defaultValue);
+            } else {
+                throw new ConversionException("No value specified");
+            }
+        }
+
+        if (value instanceof java.sql.Date && java.sql.Date.class.equals(type)) {
+            return value;
+        } else if (value instanceof java.sql.Time && java.sql.Time.class.equals(type)) {
+            return value;
+        } else if (value instanceof Timestamp && Timestamp.class.equals(type)) {
+            return value;
+        } else {
+            try {
+                if (java.sql.Date.class.equals(type)) {
+                    return new java.sql.Date(convertTimestamp2TimeMillis(value.toString()));
+                } else if (java.sql.Time.class.equals(type)) {
+                    return new java.sql.Time(convertTimestamp2TimeMillis(value.toString()));
+                } else if (Timestamp.class.equals(type)) {
+                    return new Timestamp(convertTimestamp2TimeMillis(value.toString()));
+                } else {
+                    return new Timestamp(convertTimestamp2TimeMillis(value.toString()));
+                }
+            } catch (Exception e) {
+                throw new ConversionException("Value format invalid: " + e.getMessage(), e);
+            }
+        }
+
+    }
+
+    private Long convertTimestamp2TimeMillis(String input) {
+        if (input == null) {
+            return null;
+        }
+
+        try {
+            // 先处理Timestamp类型
+            return Timestamp.valueOf(input).getTime();
+        } catch (Exception nfe) {
+            try {
+                try {
+                    return parseDate(input, DATE_FORMATS, Locale.ENGLISH).getTime();
+                } catch (Exception err) {
+                    return parseDate(input, DATE_FORMATS, Locale.getDefault()).getTime();
+                }
+            } catch (Exception err) {
+                // 最后处理long time的情况
+                return Long.parseLong(input);
+            }
+        }
+    }
+
+    private Date parseDate(String str, String[] parsePatterns, Locale locale) throws ParseException {
+        if ((str == null) || (parsePatterns == null)) {
+            throw new IllegalArgumentException("Date and Patterns must not be null");
+        }
+
+        SimpleDateFormat parser = null;
+        ParsePosition pos = new ParsePosition(0);
+
+        for (int i = 0; i < parsePatterns.length; i++) {
+            if (i == 0) {
+                parser = new SimpleDateFormat(parsePatterns[0], locale);
+            } else {
+                parser.applyPattern(parsePatterns[i]);
+            }
+            pos.setIndex(0);
+            Date date = parser.parse(str, pos);
+            if ((date != null) && (pos.getIndex() == str.length())) {
+                return date;
+            }
+        }
+
+        throw new ParseException("Unable to parse the date: " + str, -1);
+    }
+}

+ 315 - 0
example/src/main/java/com/alibaba/otter/canal/example/db/utils/SqlUtils.java

@@ -0,0 +1,315 @@
+package com.alibaba.otter.canal.example.db.utils;
+
+import org.apache.commons.beanutils.ConvertUtilsBean;
+import org.apache.commons.lang.StringUtils;
+
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.*;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SqlUtils {
+
+    public static final String REQUIRED_FIELD_NULL_SUBSTITUTE = " ";
+    public static final String SQLDATE_FORMAT = "yyyy-MM-dd";
+    public static final String TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss";
+    private static final Map<Integer, Class<?>> sqlTypeToJavaTypeMap = new HashMap<Integer, Class<?>>();
+    private static final ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean();
+
+    static {
+        // regist Converter
+        convertUtilsBean.register(SqlTimestampConverter.SQL_TIMESTAMP, Date.class);
+        convertUtilsBean.register(SqlTimestampConverter.SQL_TIMESTAMP, Time.class);
+        convertUtilsBean.register(SqlTimestampConverter.SQL_TIMESTAMP, Timestamp.class);
+        convertUtilsBean.register(ByteArrayConverter.SQL_BYTES, byte[].class);
+
+        // bool
+        sqlTypeToJavaTypeMap.put(Types.BOOLEAN, Boolean.class);
+
+        // int
+        sqlTypeToJavaTypeMap.put(Types.TINYINT, Integer.class);
+        sqlTypeToJavaTypeMap.put(Types.SMALLINT, Integer.class);
+        sqlTypeToJavaTypeMap.put(Types.INTEGER, Integer.class);
+
+        // long
+        sqlTypeToJavaTypeMap.put(Types.BIGINT, Long.class);
+        // mysql bit最多64位,无符号
+        sqlTypeToJavaTypeMap.put(Types.BIT, BigInteger.class);
+
+        // decimal
+        sqlTypeToJavaTypeMap.put(Types.REAL, Float.class);
+        sqlTypeToJavaTypeMap.put(Types.FLOAT, Float.class);
+        sqlTypeToJavaTypeMap.put(Types.DOUBLE, Double.class);
+        sqlTypeToJavaTypeMap.put(Types.NUMERIC, BigDecimal.class);
+        sqlTypeToJavaTypeMap.put(Types.DECIMAL, BigDecimal.class);
+
+        // date
+        sqlTypeToJavaTypeMap.put(Types.DATE, Date.class);
+        sqlTypeToJavaTypeMap.put(Types.TIME, Time.class);
+        sqlTypeToJavaTypeMap.put(Types.TIMESTAMP, Timestamp.class);
+
+        // blob
+        sqlTypeToJavaTypeMap.put(Types.BLOB, byte[].class);
+
+        // byte[]
+        sqlTypeToJavaTypeMap.put(Types.REF, byte[].class);
+        sqlTypeToJavaTypeMap.put(Types.OTHER, byte[].class);
+        sqlTypeToJavaTypeMap.put(Types.ARRAY, byte[].class);
+        sqlTypeToJavaTypeMap.put(Types.STRUCT, byte[].class);
+        sqlTypeToJavaTypeMap.put(Types.SQLXML, byte[].class);
+        sqlTypeToJavaTypeMap.put(Types.BINARY, byte[].class);
+        sqlTypeToJavaTypeMap.put(Types.DATALINK, byte[].class);
+        sqlTypeToJavaTypeMap.put(Types.DISTINCT, byte[].class);
+        sqlTypeToJavaTypeMap.put(Types.VARBINARY, byte[].class);
+        sqlTypeToJavaTypeMap.put(Types.JAVA_OBJECT, byte[].class);
+        sqlTypeToJavaTypeMap.put(Types.LONGVARBINARY, byte[].class);
+
+        // String
+        sqlTypeToJavaTypeMap.put(Types.CHAR, String.class);
+        sqlTypeToJavaTypeMap.put(Types.VARCHAR, String.class);
+        sqlTypeToJavaTypeMap.put(Types.LONGVARCHAR, String.class);
+        sqlTypeToJavaTypeMap.put(Types.LONGNVARCHAR, String.class);
+        sqlTypeToJavaTypeMap.put(Types.NCHAR, String.class);
+        sqlTypeToJavaTypeMap.put(Types.NVARCHAR, String.class);
+        sqlTypeToJavaTypeMap.put(Types.NCLOB, String.class);
+        sqlTypeToJavaTypeMap.put(Types.CLOB, String.class);
+    }
+
+    /**
+     * 将指定java.sql.Types的ResultSet value转换成相应的String
+     *
+     * @param rs
+     * @param index
+     * @param sqlType
+     * @return
+     * @throws SQLException
+     */
+    public static String sqlValueToString(ResultSet rs, int index, int sqlType) throws SQLException {
+        Class<?> requiredType = sqlTypeToJavaTypeMap.get(sqlType);
+        if (requiredType == null) {
+            throw new IllegalArgumentException("unknow java.sql.Types - " + sqlType);
+        }
+
+        return getResultSetValue(rs, index, requiredType);
+    }
+
+    /**
+     * sqlValueToString方法的逆向过程
+     *
+     * @param value
+     * @param sqlType
+     * @param isTextRequired
+     * @param isEmptyStringNulled
+     * @return
+     */
+    public static Object stringToSqlValue(String value, int sqlType, boolean isRequired, boolean isEmptyStringNulled) {
+        // 设置变量
+        String sourceValue = value;
+        if (SqlUtils.isTextType(sqlType)) {
+            if ((sourceValue == null) || (StringUtils.isEmpty(sourceValue) && isEmptyStringNulled)) {
+                return isRequired ? REQUIRED_FIELD_NULL_SUBSTITUTE : null;
+            } else {
+                return sourceValue;
+            }
+        } else {
+            if (StringUtils.isEmpty(sourceValue)) {
+                return null;
+            } else {
+                Class<?> requiredType = sqlTypeToJavaTypeMap.get(sqlType);
+                if (requiredType == null) {
+                    throw new IllegalArgumentException("unknow java.sql.Types - " + sqlType);
+                } else if (requiredType.equals(String.class)) {
+                    return sourceValue;
+                } else if (isNumeric(sqlType)) {
+                    return convertUtilsBean.convert(sourceValue.trim(), requiredType);
+                } else {
+                    return convertUtilsBean.convert(sourceValue, requiredType);
+                }
+            }
+        }
+    }
+
+    public static String encoding(String source, int sqlType, String sourceEncoding, String targetEncoding) {
+        switch (sqlType) {
+            case Types.CHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+            case Types.NCHAR:
+            case Types.NVARCHAR:
+            case Types.LONGNVARCHAR:
+            case Types.CLOB:
+            case Types.NCLOB:
+                if (false == StringUtils.isEmpty(source)) {
+                    String fromEncoding = StringUtils.isBlank(sourceEncoding) ? "UTF-8" : sourceEncoding;
+                    String toEncoding = StringUtils.isBlank(targetEncoding) ? "UTF-8" : targetEncoding;
+
+                    // if (false == StringUtils.equalsIgnoreCase(fromEncoding,
+                    // toEncoding)) {
+                    try {
+                        return new String(source.getBytes(fromEncoding), toEncoding);
+                    } catch (UnsupportedEncodingException e) {
+                        throw new IllegalArgumentException(e.getMessage(), e);
+                    }
+                    // }
+                }
+        }
+
+        return source;
+    }
+
+    /**
+     * Retrieve a JDBC column value from a ResultSet, using the specified value
+     * type.
+     * <p>
+     * Uses the specifically typed ResultSet accessor methods, falling back to
+     * {@link #getResultSetValue(ResultSet, int)} for unknown types.
+     * <p>
+     * Note that the returned value may not be assignable to the specified
+     * required type, in case of an unknown type. Calling code needs to deal
+     * with this case appropriately, e.g. throwing a corresponding exception.
+     *
+     * @param rs           is the ResultSet holding the data
+     * @param index        is the column index
+     * @param requiredType the required value type (may be <code>null</code>)
+     * @return the value object
+     * @throws SQLException if thrown by the JDBC API
+     */
+    private static String getResultSetValue(ResultSet rs, int index, Class<?> requiredType) throws SQLException {
+        if (requiredType == null) {
+            return getResultSetValue(rs, index);
+        }
+
+        Object value = null;
+        boolean wasNullCheck = false;
+
+        // Explicitly extract typed value, as far as possible.
+        if (String.class.equals(requiredType)) {
+            value = rs.getString(index);
+        } else if (boolean.class.equals(requiredType) || Boolean.class.equals(requiredType)) {
+            value = Boolean.valueOf(rs.getBoolean(index));
+            wasNullCheck = true;
+        } else if (byte.class.equals(requiredType) || Byte.class.equals(requiredType)) {
+            value = new Byte(rs.getByte(index));
+            wasNullCheck = true;
+        } else if (short.class.equals(requiredType) || Short.class.equals(requiredType)) {
+            value = new Short(rs.getShort(index));
+            wasNullCheck = true;
+        } else if (int.class.equals(requiredType) || Integer.class.equals(requiredType)) {
+            value = new Long(rs.getLong(index));
+            wasNullCheck = true;
+        } else if (long.class.equals(requiredType) || Long.class.equals(requiredType)) {
+            value = rs.getBigDecimal(index);
+            wasNullCheck = true;
+        } else if (float.class.equals(requiredType) || Float.class.equals(requiredType)) {
+            value = new Float(rs.getFloat(index));
+            wasNullCheck = true;
+        } else if (double.class.equals(requiredType) || Double.class.equals(requiredType)
+                || Number.class.equals(requiredType)) {
+            value = new Double(rs.getDouble(index));
+            wasNullCheck = true;
+        } else if (Time.class.equals(requiredType)) {
+            // try {
+            // value = rs.getTime(index);
+            // } catch (SQLException e) {
+            value = rs.getString(index);// 尝试拿为string对象,0000无法用Time表示
+            // if (value == null && !rs.wasNull()) {
+            // value = "00:00:00"; //
+            // mysql设置了zeroDateTimeBehavior=convertToNull,出现0值时返回为null
+            // }
+            // }
+        } else if (Timestamp.class.equals(requiredType) || Date.class.equals(requiredType)) {
+            // try {
+            // value = convertTimestamp(rs.getTimestamp(index));
+            // } catch (SQLException e) {
+            // 尝试拿为string对象,0000-00-00 00:00:00无法用Timestamp 表示
+            value = rs.getString(index);
+            // if (value == null && !rs.wasNull()) {
+            // value = "0000:00:00 00:00:00"; //
+            // mysql设置了zeroDateTimeBehavior=convertToNull,出现0值时返回为null
+            // }
+            // }
+        } else if (BigDecimal.class.equals(requiredType)) {
+            value = rs.getBigDecimal(index);
+        } else if (BigInteger.class.equals(requiredType)) {
+            value = rs.getBigDecimal(index);
+        } else if (Blob.class.equals(requiredType)) {
+            value = rs.getBlob(index);
+        } else if (Clob.class.equals(requiredType)) {
+            value = rs.getClob(index);
+        } else if (byte[].class.equals(requiredType)) {
+            try {
+                byte[] bytes = rs.getBytes(index);
+                if (bytes == null) {
+                    value = null;
+                } else {
+                    value = new String(bytes, "ISO-8859-1");// 将binary转化为iso-8859-1的字符串
+                }
+            } catch (UnsupportedEncodingException e) {
+                throw new SQLException(e);
+            }
+        } else {
+            // Some unknown type desired -> rely on getObject.
+            value = getResultSetValue(rs, index);
+        }
+
+        // Perform was-null check if demanded (for results that the
+        // JDBC driver returns as primitives).
+        if (wasNullCheck && (value != null) && rs.wasNull()) {
+            value = null;
+        }
+
+        return (value == null) ? null : convertUtilsBean.convert(value);
+    }
+
+    /**
+     * Retrieve a JDBC column value from a ResultSet, using the most appropriate
+     * value type. The returned value should be a detached value object, not
+     * having any ties to the active ResultSet: in particular, it should not be
+     * a Blob or Clob object but rather a byte array respectively String
+     * representation.
+     * <p>
+     * Uses the <code>getObject(index)</code> method, but includes additional
+     * "hacks" to get around Oracle 10g returning a non-standard object for its
+     * TIMESTAMP datatype and a <code>java.sql.Date</code> for DATE columns
+     * leaving out the time portion: These columns will explicitly be extracted
+     * as standard <code>java.sql.Timestamp</code> object.
+     *
+     * @param rs    is the ResultSet holding the data
+     * @param index is the column index
+     * @return the value object
+     * @throws SQLException if thrown by the JDBC API
+     * @see Blob
+     * @see Clob
+     * @see Timestamp
+     */
+    private static String getResultSetValue(ResultSet rs, int index) throws SQLException {
+        Object obj = rs.getObject(index);
+        return (obj == null) ? null : convertUtilsBean.convert(obj);
+    }
+
+    // private static Object convertTimestamp(Timestamp timestamp) {
+    // return (timestamp == null) ? null : timestamp.getTime();
+    // }
+
+    /**
+     * Check whether the given SQL type is numeric.
+     */
+    public static boolean isNumeric(int sqlType) {
+        return (Types.BIT == sqlType) || (Types.BIGINT == sqlType) || (Types.DECIMAL == sqlType)
+                || (Types.DOUBLE == sqlType) || (Types.FLOAT == sqlType) || (Types.INTEGER == sqlType)
+                || (Types.NUMERIC == sqlType) || (Types.REAL == sqlType) || (Types.SMALLINT == sqlType)
+                || (Types.TINYINT == sqlType);
+    }
+
+    public static boolean isTextType(int sqlType) {
+        if (sqlType == Types.CHAR || sqlType == Types.VARCHAR || sqlType == Types.CLOB || sqlType == Types.LONGVARCHAR
+                || sqlType == Types.NCHAR || sqlType == Types.NVARCHAR || sqlType == Types.NCLOB
+                || sqlType == Types.LONGNVARCHAR) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+}

+ 53 - 0
example/src/main/resources/client-spring.xml

@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd"
+       default-autowire="byName">
+
+    <bean class="com.alibaba.otter.canal.example.db.PropertyPlaceholderConfigurer" lazy-init="false">
+        <property name="ignoreResourceNotFound" value="true"/>
+        <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
+        <property name="locationNames">
+            <list>
+                <value>classpath:client.properties</value>
+            </list>
+        </property>
+    </bean>
+
+    <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
+        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
+        <property name="url" value="${target.mysql.url:}"/>
+        <property name="username" value="${target.mysql.dbUsername:canal}"/>
+        <property name="password" value="${target.mysql.dbPassword:canal}"/>
+        <property name="maxActive" value="30"/>
+        <property name="initialSize" value="0"/>
+        <property name="minIdle" value="1"/>
+        <property name="maxWait" value="10000"/>
+        <property name="timeBetweenEvictionRunsMillis" value="60000"/>
+        <property name="minEvictableIdleTimeMillis" value="300000"/>
+        <property name="validationQuery" value="SELECT 1"/>
+        <property name="exceptionSorterClassName" value="com.alibaba.druid.pool.vendor.MySqlExceptionSorter"/>
+        <property name="validConnectionCheckerClassName" value="com.alibaba.druid.pool.vendor.MySqlValidConnectionChecker"/>
+        <property name="testWhileIdle" value="true"/>
+        <property name="testOnBorrow" value="false"/>
+        <property name="testOnReturn" value="false"/>
+        <property name="useUnfairLock" value="true"/>
+    </bean>
+
+    <bean name="canalConnectorClient" class="com.alibaba.otter.canal.example.db.CanalConnectorClient" abstract="true">
+        <property name="zkServers" value="${zk.servers:127.0.0.1:2181}"/>
+        <property name="debug" value="${client.debug:true}"/>
+        <property name="destination" value="${client.destination:example}"/>
+        <property name="username" value="${client.username:canal}"/>
+        <property name="password" value="${client.password:canal}"/>
+        <property name="exceptionStrategy" value="${client.exceptionstrategy:1}"/>
+        <property name="retryTimes" value="${client.retrytimes:3}"/>
+        <property name="filter" value="${client.filter:.*\\..*}"/>
+        <property name="waitingTime" value="${client.waiting.time:10}"/>
+    </bean>
+
+
+    <bean id="mysqlClient" class="com.alibaba.otter.canal.example.db.mysql.MysqlClient" lazy-init="true" parent="canalConnectorClient">
+        <property name="dataSource" ref="dataSource"/>
+    </bean>
+</beans>

+ 16 - 0
example/src/main/resources/client.properties

@@ -0,0 +1,16 @@
+# client 配置
+zk.servers=127.0.0.1:2181
+# 5 * 1024
+client.batch.size=5120
+client.debug=false
+client.destination=example
+client.username=canal
+client.password=canal
+client.exceptionstrategy=1
+client.retrytimes=3
+client.filter=.*\\..*
+
+# 同步目标: mysql 配置
+target.mysql.url=jdbc:mysql://127.0.0.1:4306
+target.mysql.username=root
+target.mysql.password=123456

+ 15 - 4
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -6,7 +6,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import com.alibaba.otter.canal.meta.FileMixedMetaManager;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -21,7 +20,14 @@ import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
 import com.alibaba.otter.canal.instance.core.AbstractCanalInstance;
 import com.alibaba.otter.canal.instance.manager.model.Canal;
 import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.*;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.DataSourcing;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.HAMode;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.IndexMode;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.MetaMode;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.SourcingType;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageMode;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageScavengeMode;
+import com.alibaba.otter.canal.meta.FileMixedMetaManager;
 import com.alibaba.otter.canal.meta.MemoryMetaManager;
 import com.alibaba.otter.canal.meta.PeriodMixedMetaManager;
 import com.alibaba.otter.canal.meta.ZooKeeperMetaManager;
@@ -32,7 +38,12 @@ import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
-import com.alibaba.otter.canal.parse.index.*;
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
+import com.alibaba.otter.canal.parse.index.FailbackLogPositionManager;
+import com.alibaba.otter.canal.parse.index.MemoryLogPositionManager;
+import com.alibaba.otter.canal.parse.index.MetaLogPositionManager;
+import com.alibaba.otter.canal.parse.index.PeriodMixedLogPositionManager;
+import com.alibaba.otter.canal.parse.index.ZooKeeperLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.sink.entry.EntryEventSink;
@@ -110,7 +121,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             ZooKeeperMetaManager zooKeeperMetaManager = new ZooKeeperMetaManager();
             zooKeeperMetaManager.setZkClientx(getZkclientx());
             ((PeriodMixedMetaManager) metaManager).setZooKeeperMetaManager(zooKeeperMetaManager);
-        } else if (mode.isLocalFile()){
+        } else if (mode.isLocalFile()) {
             FileMixedMetaManager fileMixedMetaManager = new FileMixedMetaManager();
             fileMixedMetaManager.setDataDir(parameters.getDataDir());
             fileMixedMetaManager.setPeriod(parameters.getMetaFileFlushPeriod());

+ 38 - 2
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -93,6 +93,10 @@ public class CanalParameter implements Serializable {
     private Boolean                  filterTableError                   = Boolean.FALSE;             // 是否忽略表解析异常
     private String                   blackFilter                        = null;                      // 匹配黑名单,忽略解析
 
+    private Boolean                  tsdbEnable                         = Boolean.FALSE;             // 是否开启tableMetaTSDB
+    private String                   tsdbJdbcUrl;
+    private String                   tsdbJdbcUserName;
+    private String                   tsdbJdbcPassword;
     // ================================== 兼容字段处理
     private InetSocketAddress        masterAddress;                                                  // 主库信息
     private String                   masterUsername;                                                 // 帐号
@@ -246,7 +250,7 @@ public class CanalParameter implements Serializable {
         ZOOKEEPER,
         /** 混合模式,内存+文件 */
         MIXED,
-        /** 本地文件存储模式*/
+        /** 本地文件存储模式 */
         LOCAL_FILE;
 
         public boolean isMemory() {
@@ -261,7 +265,7 @@ public class CanalParameter implements Serializable {
             return this.equals(MetaMode.MIXED);
         }
 
-        public boolean isLocalFile(){
+        public boolean isLocalFile() {
             return this.equals(MetaMode.LOCAL_FILE);
         }
     }
@@ -883,6 +887,38 @@ public class CanalParameter implements Serializable {
         this.blackFilter = blackFilter;
     }
 
+    public Boolean getTsdbEnable() {
+        return tsdbEnable;
+    }
+
+    public void setTsdbEnable(Boolean tsdbEnable) {
+        this.tsdbEnable = tsdbEnable;
+    }
+
+    public String getTsdbJdbcUrl() {
+        return tsdbJdbcUrl;
+    }
+
+    public void setTsdbJdbcUrl(String tsdbJdbcUrl) {
+        this.tsdbJdbcUrl = tsdbJdbcUrl;
+    }
+
+    public String getTsdbJdbcUserName() {
+        return tsdbJdbcUserName;
+    }
+
+    public void setTsdbJdbcUserName(String tsdbJdbcUserName) {
+        this.tsdbJdbcUserName = tsdbJdbcUserName;
+    }
+
+    public String getTsdbJdbcPassword() {
+        return tsdbJdbcPassword;
+    }
+
+    public void setTsdbJdbcPassword(String tsdbJdbcPassword) {
+        this.tsdbJdbcPassword = tsdbJdbcPassword;
+    }
+
     public String toString() {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }

+ 36 - 20
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -8,34 +8,38 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.filter.CanalEventFilter;
 import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
+import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
 import com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DefaultTableMetaTSDBFactory;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
-import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBBuilder;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBFactory;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 
 public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
-    protected final Logger            logger                    = LoggerFactory.getLogger(this.getClass());
-    protected static final long       BINLOG_START_OFFEST       = 4L;
+    protected final Logger         logger                    = LoggerFactory.getLogger(this.getClass());
+    protected static final long    BINLOG_START_OFFEST       = 4L;
+
+    protected TableMetaTSDBFactory tableMetaTSDBFactory      = new DefaultTableMetaTSDBFactory();
+    protected boolean              enableTsdb                = false;
+    protected String               tsdbSpringXml;
+    protected TableMetaTSDB        tableMetaTSDB;
 
-    protected boolean                 enableTsdb                = false;
-    protected String                  tsdbSpringXml;
-    protected TableMetaTSDB           tableMetaTSDB;
     // 编码信息
-    protected byte                    connectionCharsetNumber   = (byte) 33;
-    protected Charset                 connectionCharset         = Charset.forName("UTF-8");
-    protected boolean                 filterQueryDcl            = false;
-    protected boolean                 filterQueryDml            = false;
-    protected boolean                 filterQueryDdl            = false;
-    protected boolean                 filterRows                = false;
-    protected boolean                 filterTableError          = false;
-    protected boolean                 useDruidDdlFilter         = true;
-    private final AtomicLong          eventsPublishBlockingTime = new AtomicLong(0L);
+    protected byte                 connectionCharsetNumber   = (byte) 33;
+    protected Charset              connectionCharset         = Charset.forName("UTF-8");
+    protected boolean              filterQueryDcl            = false;
+    protected boolean              filterQueryDml            = false;
+    protected boolean              filterQueryDdl            = false;
+    protected boolean              filterRows                = false;
+    protected boolean              filterTableError          = false;
+    protected boolean              useDruidDdlFilter         = true;
+    private final AtomicLong       eventsPublishBlockingTime = new AtomicLong(0L);
 
     protected BinlogParser buildParser() {
         LogEventConvert convert = new LogEventConvert();
@@ -93,8 +97,16 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     public void start() throws CanalParseException {
         if (enableTsdb) {
             if (tableMetaTSDB == null) {
-                // 初始化
-                tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
+                synchronized (CanalEventParser.class) {
+                    try {
+                        // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
+                        System.setProperty("canal.instance.destination", destination);
+                        // 初始化
+                        tableMetaTSDB = tableMetaTSDBFactory.build(destination, tsdbSpringXml);
+                    } finally {
+                        System.setProperty("canal.instance.destination", "");
+                    }
+                }
             }
         }
 
@@ -103,7 +115,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
     public void stop() throws CanalParseException {
         if (enableTsdb) {
-            TableMetaTSDBBuilder.destory(destination);
+            tableMetaTSDBFactory.destory(destination);
             tableMetaTSDB = null;
         }
 
@@ -177,7 +189,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         if (this.enableTsdb) {
             if (tableMetaTSDB == null) {
                 // 初始化
-                tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
+                tableMetaTSDB = tableMetaTSDBFactory.build(destination, tsdbSpringXml);
             }
         }
     }
@@ -187,7 +199,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         if (this.enableTsdb) {
             if (tableMetaTSDB == null) {
                 // 初始化
-                tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
+                tableMetaTSDB = tableMetaTSDBFactory.build(destination, tsdbSpringXml);
             }
         }
     }
@@ -195,5 +207,9 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     public AtomicLong getEventsPublishBlockingTime() {
         return this.eventsPublishBlockingTime;
     }
+    
+    public void setTableMetaTSDBFactory(TableMetaTSDBFactory tableMetaTSDBFactory) {
+        this.tableMetaTSDBFactory = tableMetaTSDBFactory;
+    }
 
 }

+ 0 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -63,7 +63,6 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private String             detectingSQL;                                 // 心跳sql
     private MysqlConnection    metaConnection;                               // 查询meta信息的链接
     private TableMetaCache     tableMetaCache;                               // 对应meta
-                                                                              // cache
     private int                fallbackIntervalInSeconds         = 60;       // 切换回退时间
     private BinlogFormat[]     supportBinlogFormats;                         // 支持的binlogFormat,如果设置会执行强校验
     private BinlogImage[]      supportBinlogImages;                          // 支持的binlogImage,如果设置会执行强校验

+ 8 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -541,12 +541,16 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                     tableError |= parseOneRow(rowDataBuilder, event, buffer, changeColumns, true, tableMeta);
                 }
 
-                rowsCount ++;
+                rowsCount++;
                 rowChangeBuider.addRowDatas(rowDataBuilder.build());
             }
 
             TableMapLogEvent table = event.getTable();
-            Header header = createHeader(event.getHeader(), table.getDbName(), table.getTableName(), eventType, rowsCount);
+            Header header = createHeader(event.getHeader(),
+                table.getDbName(),
+                table.getTableName(),
+                eventType,
+                rowsCount);
 
             RowChange rowChange = rowChangeBuider.build();
             if (tableError) {
@@ -801,12 +805,12 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
     }
 
-
     private Header createHeader(LogHeader logHeader, String schemaName, String tableName, EventType eventType) {
         return createHeader(logHeader, schemaName, tableName, eventType, -1);
     }
 
-    private Header createHeader(LogHeader logHeader, String schemaName, String tableName, EventType eventType, Integer rowsCount) {
+    private Header createHeader(LogHeader logHeader, String schemaName, String tableName, EventType eventType,
+                                Integer rowsCount) {
         // header会做信息冗余,方便以后做检索或者过滤
         Header.Builder headerBuilder = Header.newBuilder();
         headerBuilder.setVersion(version);
@@ -960,5 +964,4 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     public void setGtidSet(GTIDSet gtidSet) {
         this.gtidSet = gtidSet;
     }
-
 }

+ 19 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DefaultTableMetaTSDBFactory.java

@@ -0,0 +1,19 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
+
+/**
+ * @author agapple 2017年10月11日 下午8:45:40
+ * @since 1.0.25
+ */
+public class DefaultTableMetaTSDBFactory implements TableMetaTSDBFactory {
+
+    /**
+     * 代理一下tableMetaTSDB的获取,使用隔离的spring定义
+     */
+    public TableMetaTSDB build(String destination, String springXml) {
+        return TableMetaTSDBBuilder.build(destination, springXml);
+    }
+
+    public void destory(String destination) {
+        TableMetaTSDBBuilder.destory(destination);
+    }
+}

+ 5 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBBuilder.java

@@ -10,12 +10,15 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
 import com.google.common.collect.Maps;
 
 /**
- * @author agapple 2017年10月11日 下午8:45:40
+ * tableMeta构造器
+ * 
+ * @author agapple 2018年8月8日 上午11:01:08
  * @since 1.0.25
  */
+
 public class TableMetaTSDBBuilder {
 
-    protected final static Logger                                        logger   = LoggerFactory.getLogger(TableMetaTSDBBuilder.class);
+    protected final static Logger                                        logger   = LoggerFactory.getLogger(DefaultTableMetaTSDBFactory.class);
     private static ConcurrentMap<String, ClassPathXmlApplicationContext> contexts = Maps.newConcurrentMap();
 
     /**

+ 18 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBFactory.java

@@ -0,0 +1,18 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
+
+/**
+ * tableMeta构造器,允许重载实现
+ * 
+ * @author agapple 2018年8月8日 上午11:01:08
+ * @since 1.0.26
+ */
+
+public interface TableMetaTSDBFactory {
+
+    /**
+     * 代理一下tableMetaTSDB的获取,使用隔离的spring定义
+     */
+    public TableMetaTSDB build(String destination, String springXml);
+
+    public void destory(String destination);
+}

+ 38 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/NoStorageTest.java

@@ -0,0 +1,38 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.Date;
+
+public class NoStorageTest {
+    final String DBNAME = "testdb";
+    final String TBNAME = "testtb";
+    final String DDL = "CREATE TABLE `testtb` (\n" +
+            "   `id` int(11) NOT NULL AUTO_INCREMENT,\n" +
+            "   `name` varchar(2048) DEFAULT NULL,\n" +
+            "   `datachange_lasttime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最晚更新时间',\n" +
+            "   `otter_testcol` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol1` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol2` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol3` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol4` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol5` varchar(45) DEFAULT NULL,\n" +
+            "   PRIMARY KEY (`id`)\n" +
+            " ) ENGINE=InnoDB AUTO_INCREMENT=58333898 DEFAULT CHARSET=utf8mb4";
+    @Test
+    public void nostorage() {
+        MysqlConnection connection = new MysqlConnection(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
+        TableMetaCacheWithStorage tableMetaCacheWithStorage = new TableMetaCacheWithStorage(connection, null);
+        EntryPosition entryPosition = new EntryPosition();
+        entryPosition.setTimestamp(new Date().getTime());
+        String fullTableName = DBNAME + "." + TBNAME;
+        tableMetaCacheWithStorage.apply(entryPosition, fullTableName, DDL, null);
+        entryPosition.setTimestamp(new Date().getTime() + 1000L);
+        TableMeta result = tableMetaCacheWithStorage.getTableMeta(DBNAME, TBNAME, false, entryPosition);
+        assert result.getDdl().equalsIgnoreCase(DDL);
+    }
+}

+ 77 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/StorageTest.java

@@ -0,0 +1,77 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql.MySqlTableMetaCallback;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql.MySqlTableMetaStorageFactory;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+import com.alibaba.otter.canal.protocol.position.Position;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class StorageTest {
+
+    final String DBNAME = "testdb";
+    final String TBNAME = "testtb";
+    final String DDL = "CREATE TABLE `testtb` (\n" +
+            "   `id` int(11) NOT NULL AUTO_INCREMENT,\n" +
+            "   `name` varchar(2048) DEFAULT NULL,\n" +
+            "   `datachange_lasttime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最晚更新时间',\n" +
+            "   `otter_testcol` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol1` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol2` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol3` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol4` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol5` varchar(45) DEFAULT NULL,\n" +
+            "   PRIMARY KEY (`id`)\n" +
+            " ) ENGINE=InnoDB AUTO_INCREMENT=58333898 DEFAULT CHARSET=utf8mb4";
+
+    @Test
+    public void storage() {
+
+        MySqlTableMetaStorageFactory factory = new MySqlTableMetaStorageFactory(new MySqlTableMetaCallback() {
+            @Override
+            public void save(String dbAddress, String schema, String table, String ddl, Long timestamp) {
+
+            }
+
+            @Override
+            public List<TableMetaEntry> fetch(String dbAddress, String dbName) {
+                TableMetaEntry tableMeta = new TableMetaEntry();
+                tableMeta.setSchema(DBNAME);
+                tableMeta.setTable(TBNAME);
+                tableMeta.setDdl(DDL);
+                tableMeta.setTimestamp(new Date().getTime());
+                List<TableMetaEntry> entries = new ArrayList<TableMetaEntry>();
+                entries.add(tableMeta);
+                return entries;
+            }
+
+            @Override
+            public List<TableMetaEntry> fetch(String dbAddress, String dbName, String tableName) {
+                TableMetaEntry tableMeta = new TableMetaEntry();
+                tableMeta.setSchema(DBNAME);
+                tableMeta.setTable(TBNAME);
+                tableMeta.setDdl(DDL);
+                tableMeta.setTimestamp(new Date().getTime());
+                List<TableMetaEntry> entries = new ArrayList<TableMetaEntry>();
+                entries.add(tableMeta);
+                return entries;
+            }
+        }, DBNAME);
+        MysqlConnection connection = new MysqlConnection(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
+        TableMetaCacheWithStorage tableMetaCacheWithStorage = new TableMetaCacheWithStorage(connection, factory.getTableMetaStorage());
+        EntryPosition entryPosition = new EntryPosition();
+        entryPosition.setTimestamp(new Date().getTime());
+        String fullTableName = DBNAME + "." + TBNAME;
+        tableMetaCacheWithStorage.apply(entryPosition, fullTableName, DDL, null);
+
+        entryPosition.setTimestamp(new Date().getTime() + 1000L);
+        TableMeta result = tableMetaCacheWithStorage.getTableMeta(DBNAME, TBNAME, false, entryPosition);
+        assert result.getDdl().equalsIgnoreCase(DDL);
+    }
+}

+ 4 - 4
pom.xml

@@ -96,8 +96,8 @@
         <maven.test.skip>true</maven.test.skip>
         <downloadSources>true</downloadSources>
         <!-- compiler settings properties -->
-        <java_source_version>1.6</java_source_version>
-        <java_target_version>1.6</java_target_version>
+        <java_source_version>1.7</java_source_version>
+        <java_target_version>1.7</java_target_version>
         <file_encoding>UTF-8</file_encoding>
         <spring_version>3.2.9.RELEASE</spring_version>
     </properties>
@@ -247,7 +247,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_520</version>
+                <version>2.0.0_preview_540</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>
@@ -332,7 +332,7 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
-                <version>3.2</version>
+                <version>3.8.0</version>
                 <configuration>
                     <source>${java_source_version}</source>
                     <target>${java_target_version}</target>