Browse Source

merge tsdb meta support and refactor

agapple 7 years ago
parent
commit
19707f92d4
45 changed files with 573 additions and 887 deletions
  1. 4 8
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/BinlogPosition.java
  2. 2 7
      deployer/src/main/resources/example/instance.properties
  3. 1 5
      deployer/src/main/resources/spring/file-instance.xml
  4. 1 4
      deployer/src/main/resources/spring/tsdb/dal-dao.xml
  5. 0 0
      deployer/src/main/resources/spring/tsdb/mybatis-config.xml
  6. 0 6
      deployer/src/main/resources/spring/tsdb/sql-map/sqlmap_history.xml
  7. 0 0
      deployer/src/main/resources/spring/tsdb/sql-map/sqlmap_snapshot.xml
  8. 0 0
      deployer/src/main/resources/spring/tsdb/sqlmap-config.xml
  9. 1 1
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java
  10. 56 1
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlQueryExecutor.java
  11. 4 3
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/ClientAuthenticationPacket.java
  12. 3 3
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  13. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/BinlogParser.java
  14. 12 12
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/TableMeta.java
  15. 4 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java
  16. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java
  17. 11 63
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  18. 9 11
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  19. 0 77
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DBMSAction.java
  20. 121 89
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  21. 86 79
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
  22. 19 20
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/ddl/DdlResult.java
  23. 17 17
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/ddl/DruidDdlParser.java
  24. 1 99
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/ddl/SimpleDdlParser.java
  25. 25 27
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DataSourceFactoryTSDB.java
  26. 3 3
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java
  27. 130 240
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaManager.java
  28. 4 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaHistoryDAO.java
  29. 4 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaSnapshotDAO.java
  30. 5 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/model/MetaHistoryDO.java
  31. 5 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/model/MetaSnapshotDO.java
  32. 1 0
      parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java
  33. 1 0
      parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java
  34. 31 22
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/TableMetaCacheTest.java
  35. 0 1
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/group/GroupEventPaserTest.java
  36. 2 2
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/SimpleDdlParserTest.java
  37. 2 2
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMetaTest.java
  38. 3 6
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MetaHistoryDAOTest.java
  39. 0 2
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaManagerTest.java
  40. 0 46
      parse/src/test/resources/dal-dao.xml
  41. 3 6
      parse/src/test/resources/tsdb/dal-dao.xml
  42. 0 0
      parse/src/test/resources/tsdb/mybatis-config.xml
  43. 0 0
      parse/src/test/resources/tsdb/sql-map/sqlmap_history.xml
  44. 0 0
      parse/src/test/resources/tsdb/sql-map/sqlmap_snapshot.xml
  45. 0 0
      parse/src/test/resources/tsdb/sqlmap-config.xml

+ 4 - 8
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/BinlogPosition.java

@@ -1,6 +1,5 @@
 package com.taobao.tddl.dbsync.binlog;
 package com.taobao.tddl.dbsync.binlog;
 
 
-import com.taobao.tddl.dbsync.binlog.LogPosition;
 
 
 /**
 /**
  * Position inside binlog file
  * Position inside binlog file
@@ -35,9 +34,8 @@ public class BinlogPosition extends LogPosition {
     }
     }
 
 
     private final static long[] pow10 = { 1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000,
     private final static long[] pow10 = { 1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000,
-        10000000000L, 100000000000L, 1000000000000L, 10000000000000L,
-        100000000000000L, 1000000000000000L, 10000000000000000L, 100000000000000000L,
-        1000000000000000000L };
+            10000000000L, 100000000000L, 1000000000000L, 10000000000000L, 100000000000000L, 1000000000000000L,
+            10000000000000000L, 100000000000000000L, 1000000000000000000L };
 
 
     public static String placeHolder(int bit, long number) {
     public static String placeHolder(int bit, long number) {
         if (bit > 18) {
         if (bit > 18) {
@@ -53,9 +51,6 @@ public class BinlogPosition extends LogPosition {
         return String.valueOf(max + number).substring(1);
         return String.valueOf(max + number).substring(1);
     }
     }
 
 
-    /**
-     * Return BinlogPosition in String representation. This serves as EventId for DBMSEvent.
-     */
     public String format2String(final int positionMaxLen) {
     public String format2String(final int positionMaxLen) {
         String binlogSuffix = fileName;
         String binlogSuffix = fileName;
         String binlogOffset = placeHolder((int) positionMaxLen, position);
         String binlogOffset = placeHolder((int) positionMaxLen, position);
@@ -90,7 +85,8 @@ public class BinlogPosition extends LogPosition {
         if (sharpIndex != -1) {
         if (sharpIndex != -1) {
             binlogPosition = Long.parseLong(source.substring(miscIndex, sharpIndex));
             binlogPosition = Long.parseLong(source.substring(miscIndex, sharpIndex));
         } else if (semicolonIndex != -1) {
         } else if (semicolonIndex != -1) {
-            binlogPosition = Long.parseLong(source.substring(miscIndex, semicolonIndex)); // NOTE: 向后兼容
+            binlogPosition = Long.parseLong(source.substring(miscIndex, semicolonIndex)); // NOTE:
+                                                                                          // 向后兼容
         } else if (dotIndex != -1) {
         } else if (dotIndex != -1) {
             binlogPosition = Long.parseLong(source.substring(miscIndex, dotIndex));
             binlogPosition = Long.parseLong(source.substring(miscIndex, dotIndex));
         } else {
         } else {

+ 2 - 7
deployer/src/main/resources/example/instance.properties

@@ -9,15 +9,10 @@ canal.instance.master.timestamp=1506088042
 
 
 
 
 # tsdb info
 # tsdb info
-canal.instance.tsdb.address=127.0.0.1:3306
 canal.instance.tsdb.enable=true
 canal.instance.tsdb.enable=true
+canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/tsdb
 canal.instance.tsdb.dbUsername=canal
 canal.instance.tsdb.dbUsername=canal
 canal.instance.tsdb.dbPassword=canal
 canal.instance.tsdb.dbPassword=canal
-#  tsdb关联的目标db,和canal.instance.defaultDatabaseName意义相同
-canal.instance.tsdb.defaultDatabaseName=test
-canal.instance.tsdb.connectionCharset=UTF-8
-canal.instance.tsdb.driverClassName=com.mysql.jdbc.Driver
-canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/tsdb
 
 
 
 
 #canal.instance.standby.address =
 #canal.instance.standby.address =
@@ -31,6 +26,6 @@ canal.instance.defaultDatabaseName=test
 canal.instance.connectionCharset=UTF-8
 canal.instance.connectionCharset=UTF-8
 # table regex
 # table regex
 canal.instance.filter.regex=.*\\..*
 canal.instance.filter.regex=.*\\..*
-# table black regex , 默认过滤表结构管理的数据
+# table black regex
 canal.instance.filter.black.regex=tsdb\\..*
 canal.instance.filter.black.regex=tsdb\\..*
 #################################################
 #################################################

+ 1 - 5
deployer/src/main/resources/spring/file-instance.xml

@@ -180,13 +180,10 @@
 	<!--tsdb related-->
 	<!--tsdb related-->
 	<bean id="dataSource" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DataSourceFactoryTSDB"
 	<bean id="dataSource" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DataSourceFactoryTSDB"
 		  factory-method="getDataSource">
 		  factory-method="getDataSource">
-		<constructor-arg index="0" value="${canal.instance.tsdb.address}"/>
+		<constructor-arg index="0" value="${canal.instance.tsdb.url}"/>
 		<constructor-arg index="1" value="${canal.instance.tsdb.dbUsername}"/>
 		<constructor-arg index="1" value="${canal.instance.tsdb.dbUsername}"/>
 		<constructor-arg index="2" value="${canal.instance.tsdb.dbPassword}"/>
 		<constructor-arg index="2" value="${canal.instance.tsdb.dbPassword}"/>
 		<constructor-arg index="3" value="${canal.instance.tsdb.enable:true}"/>
 		<constructor-arg index="3" value="${canal.instance.tsdb.enable:true}"/>
-		<constructor-arg index="4" value="${canal.instance.tsdb.defaultDatabaseName}"/>
-		<constructor-arg index="5" value="${canal.instance.tsdb.url}"/>
-		<constructor-arg index="6" value="${canal.instance.tsdb.driverClassName}"/>
 	</bean>
 	</bean>
 
 
 	<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
 	<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
@@ -210,5 +207,4 @@
 	<bean id="metaSnapshotDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO">
 	<bean id="metaSnapshotDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO">
 		<property name="sqlMapClient" ref="sqlMapClient"/>
 		<property name="sqlMapClient" ref="sqlMapClient"/>
 	</bean>
 	</bean>
-
 </beans>
 </beans>

+ 1 - 4
deployer/src/main/resources/spring/dal-dao.xml → deployer/src/main/resources/spring/tsdb/dal-dao.xml

@@ -9,13 +9,10 @@
 
 
     <bean id="dataSource" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DataSourceFactoryTSDB"
     <bean id="dataSource" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DataSourceFactoryTSDB"
           factory-method="getDataSource">
           factory-method="getDataSource">
-        <constructor-arg index="0" value="${canal.instance.tsdb.address}"/>
+        <constructor-arg index="0" value="${canal.instance.tsdb.url}"/>
         <constructor-arg index="1" value="${canal.instance.tsdb.dbUsername}"/>
         <constructor-arg index="1" value="${canal.instance.tsdb.dbUsername}"/>
         <constructor-arg index="2" value="${canal.instance.tsdb.dbPassword}"/>
         <constructor-arg index="2" value="${canal.instance.tsdb.dbPassword}"/>
         <constructor-arg index="3" value="${canal.instance.tsdb.enable:true}"/>
         <constructor-arg index="3" value="${canal.instance.tsdb.enable:true}"/>
-        <constructor-arg index="4" value="${canal.instance.tsdb.defaultDatabaseName}"/>
-        <constructor-arg index="5" value="${canal.instance.tsdb.url}"/>
-        <constructor-arg index="6" value="${canal.instance.tsdb.driverClassName}"/>
     </bean>
     </bean>
 
 
     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">

+ 0 - 0
deployer/src/main/resources/spring/mybatis-config.xml → deployer/src/main/resources/spring/tsdb/mybatis-config.xml


+ 0 - 6
parse/src/main/resources/sql-map/sqlmap_history.xml → deployer/src/main/resources/spring/tsdb/sql-map/sqlmap_history.xml

@@ -6,18 +6,14 @@
 
 
     <sql id="allColumns">
     <sql id="allColumns">
         <![CDATA[
         <![CDATA[
-
 		gmt_create,gmt_modified,binlog_file,binlog_offest,binlog_master_id,binlog_timestamp,use_schema,`schema`,`table`,`sql`,`type`,`extra`
 		gmt_create,gmt_modified,binlog_file,binlog_offest,binlog_master_id,binlog_timestamp,use_schema,`schema`,`table`,`sql`,`type`,`extra`
-
         ]]>
         ]]>
     </sql>
     </sql>
     <sql id="allVOColumns">
     <sql id="allVOColumns">
         <![CDATA[
         <![CDATA[
-
 		a.id as id,a.gmt_create as gmtCreate,a.gmt_modified as gmtModified,
 		a.id as id,a.gmt_create as gmtCreate,a.gmt_modified as gmtModified,
 		a.binlog_file as binlogFile,a.binlog_offest as binlogOffest,a.binlog_master_id as binlogMasterId,a.binlog_timestamp as binlogTimestamp,
 		a.binlog_file as binlogFile,a.binlog_offest as binlogOffest,a.binlog_master_id as binlogMasterId,a.binlog_timestamp as binlogTimestamp,
 		a.use_schema as useSchema,a.`schema` as `schema`,a.`table` as `table`,a.`sql` as `sql`,a.`type` as `type`,a.`extra` as `extra`
 		a.use_schema as useSchema,a.`schema` as `schema`,a.`table` as `table`,a.`sql` as `sql`,a.`type` as `type`,a.`extra` as `extra`
-
         ]]>
         ]]>
     </sql>
     </sql>
 
 
@@ -42,10 +38,8 @@
 
 
     <delete id="deleteByGmtModified" parameterClass="java.util.Map">
     <delete id="deleteByGmtModified" parameterClass="java.util.Map">
         <![CDATA[
         <![CDATA[
-
 		delete from `canal_table_meta_history`
 		delete from `canal_table_meta_history`
 		where gmt_modified < date_sub(now(),interval #interval# second)
 		where gmt_modified < date_sub(now(),interval #interval# second)
-
         ]]>
         ]]>
     </delete>
     </delete>
 
 

+ 0 - 0
deployer/src/main/resources/spring/sql-map/sqlmap_snapshot.xml → deployer/src/main/resources/spring/tsdb/sql-map/sqlmap_snapshot.xml


+ 0 - 0
deployer/src/main/resources/spring/sqlmap-config.xml → deployer/src/main/resources/spring/tsdb/sqlmap-config.xml


+ 1 - 1
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -34,7 +34,7 @@ public class MysqlConnector {
     private byte                charsetNumber     = 33;
     private byte                charsetNumber     = 33;
     private String              defaultSchema     = "retl";
     private String              defaultSchema     = "retl";
     private int                 soTimeout         = 30 * 1000;
     private int                 soTimeout         = 30 * 1000;
-    private int connTimeout = 5 * 1000;
+    private int                 connTimeout       = 5 * 1000;
     private int                 receiveBufferSize = 16 * 1024;
     private int                 receiveBufferSize = 16 * 1024;
     private int                 sendBufferSize    = 16 * 1024;
     private int                 sendBufferSize    = 16 * 1024;
 
 

+ 56 - 1
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlQueryExecutor.java

@@ -6,6 +6,7 @@ import java.util.List;
 
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.QueryCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.QueryCommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.EOFPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetHeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetHeaderPacket;
@@ -93,11 +94,65 @@ public class MysqlQueryExecutor {
         return resultSet;
         return resultSet;
     }
     }
 
 
-    private void readEofPacket() throws IOException {
+    public List<ResultSetPacket> queryMulti(String queryString) throws IOException {
+        QueryCommandPacket cmd = new QueryCommandPacket();
+        cmd.setQueryString(queryString);
+        byte[] bodyBytes = cmd.toBytes();
+        PacketManager.writeBody(channel, bodyBytes);
+        List<ResultSetPacket> resultSets = new ArrayList<ResultSetPacket>();
+        boolean moreResult = true;
+        while (moreResult) {
+            byte[] body = readNextPacket();
+            if (body[0] < 0) {
+                ErrorPacket packet = new ErrorPacket();
+                packet.fromBytes(body);
+                throw new IOException(packet + "\n with command: " + queryString);
+            }
+
+            ResultSetHeaderPacket rsHeader = new ResultSetHeaderPacket();
+            rsHeader.fromBytes(body);
+
+            List<FieldPacket> fields = new ArrayList<FieldPacket>();
+            for (int i = 0; i < rsHeader.getColumnCount(); i++) {
+                FieldPacket fp = new FieldPacket();
+                fp.fromBytes(readNextPacket());
+                fields.add(fp);
+            }
+
+            moreResult = readEofPacket();
+
+            List<RowDataPacket> rowData = new ArrayList<RowDataPacket>();
+            while (true) {
+                body = readNextPacket();
+                if (body[0] == -2) {
+                    break;
+                }
+                RowDataPacket rowDataPacket = new RowDataPacket();
+                rowDataPacket.fromBytes(body);
+                rowData.add(rowDataPacket);
+            }
+
+            ResultSetPacket resultSet = new ResultSetPacket();
+            resultSet.getFieldDescriptors().addAll(fields);
+            for (RowDataPacket r : rowData) {
+                resultSet.getFieldValues().addAll(r.getColumns());
+            }
+            resultSet.setSourceAddress(channel.getRemoteSocketAddress());
+            resultSets.add(resultSet);
+        }
+
+        return resultSets;
+    }
+
+    private boolean readEofPacket() throws IOException {
         byte[] eofBody = readNextPacket();
         byte[] eofBody = readNextPacket();
+        EOFPacket packet = new EOFPacket();
+        packet.fromBytes(eofBody);
         if (eofBody[0] != -2) {
         if (eofBody[0] != -2) {
             throw new IOException("EOF Packet is expected, but packet with field_count=" + eofBody[0] + " is found.");
             throw new IOException("EOF Packet is expected, but packet with field_count=" + eofBody[0] + " is found.");
         }
         }
+
+        return (packet.statusFlag & 0x0008) != 0;
     }
     }
 
 
     protected byte[] readNextPacket() throws IOException {
     protected byte[] readNextPacket() throws IOException {

+ 4 - 3
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/ClientAuthenticationPacket.java

@@ -47,10 +47,11 @@ public class ClientAuthenticationPacket extends PacketWithHeaderPacket {
         /**
         /**
          * CLIENT_LONG_PASSWORD CLIENT_LONG_FLAG CLIENT_PROTOCOL_41
          * CLIENT_LONG_PASSWORD CLIENT_LONG_FLAG CLIENT_PROTOCOL_41
          * CLIENT_INTERACTIVE CLIENT_TRANSACTIONS CLIENT_SECURE_CONNECTION
          * CLIENT_INTERACTIVE CLIENT_TRANSACTIONS CLIENT_SECURE_CONNECTION
+         * CLIENT_MULTI_STATEMENTS;
          */
          */
-        ByteHelper.writeUnsignedIntLittleEndian(1 | 4 | 512 | 8192 | 32768, out); // remove
-                                                                                  // client_interactive
-                                                                                  // feature
+        ByteHelper.writeUnsignedIntLittleEndian(1 | 4 | 512 | 8192 | 32768 | 0x00010000, out); // remove
+        // client_interactive
+        // feature
 
 
         // 2. write max_packet_size
         // 2. write max_packet_size
         ByteHelper.writeUnsignedIntLittleEndian(MSC.MAX_PACKET_LENGTH, out);
         ByteHelper.writeUnsignedIntLittleEndian(MSC.MAX_PACKET_LENGTH, out);

+ 3 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -173,7 +173,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
 
 
                             public boolean sink(EVENT event) {
                             public boolean sink(EVENT event) {
                                 try {
                                 try {
-                                    CanalEntry.Entry entry = parseAndProfilingIfNecessary(event,false);
+                                    CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, false);
 
 
                                     if (!running) {
                                     if (!running) {
                                         return false;
                                         return false;
@@ -320,13 +320,13 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         return result;
         return result;
     }
     }
 
 
-    protected CanalEntry.Entry parseAndProfilingIfNecessary(EVENT bod,boolean isSeek) throws Exception {
+    protected CanalEntry.Entry parseAndProfilingIfNecessary(EVENT bod, boolean isSeek) throws Exception {
         long startTs = -1;
         long startTs = -1;
         boolean enabled = getProfilingEnabled();
         boolean enabled = getProfilingEnabled();
         if (enabled) {
         if (enabled) {
             startTs = System.currentTimeMillis();
             startTs = System.currentTimeMillis();
         }
         }
-        CanalEntry.Entry event = binlogParser.parse(bod,isSeek);
+        CanalEntry.Entry event = binlogParser.parse(bod, isSeek);
         if (enabled) {
         if (enabled) {
             this.parsingInterval = System.currentTimeMillis() - startTs;
             this.parsingInterval = System.currentTimeMillis() - startTs;
         }
         }

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/BinlogParser.java

@@ -11,7 +11,7 @@ import com.alibaba.otter.canal.protocol.CanalEntry;
  */
  */
 public interface BinlogParser<T> extends CanalLifeCycle {
 public interface BinlogParser<T> extends CanalLifeCycle {
 
 
-    CanalEntry.Entry parse(T event,boolean isSeek) throws CanalParseException;
+    CanalEntry.Entry parse(T event, boolean isSeek) throws CanalParseException;
 
 
     void reset();
     void reset();
 }
 }

+ 12 - 12
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/TableMeta.java

@@ -20,16 +20,16 @@ import org.apache.commons.lang.StringUtils;
  */
  */
 public class TableMeta {
 public class TableMeta {
 
 
-    private String schema;
-    private String table;
+    private String          schema;
+    private String          table;
     private List<FieldMeta> fields = new ArrayList<TableMeta.FieldMeta>();
     private List<FieldMeta> fields = new ArrayList<TableMeta.FieldMeta>();
-    private String ddl; // 表结构的DDL语句
+    private String          ddl;                                          // 表结构的DDL语句
 
 
-    public TableMeta() {
+    public TableMeta(){
 
 
     }
     }
 
 
-    public TableMeta(String schema, String table, List<FieldMeta> fields) {
+    public TableMeta(String schema, String table, List<FieldMeta> fields){
         this.schema = schema;
         this.schema = schema;
         this.table = table;
         this.table = table;
         this.fields = fields;
         this.fields = fields;
@@ -109,11 +109,11 @@ public class TableMeta {
 
 
     public static class FieldMeta {
     public static class FieldMeta {
 
 
-        public FieldMeta() {
+        public FieldMeta(){
 
 
         }
         }
 
 
-        public FieldMeta(String columnName, String columnType, boolean nullable, boolean key, String defaultValue) {
+        public FieldMeta(String columnName, String columnType, boolean nullable, boolean key, String defaultValue){
             this.columnName = columnName;
             this.columnName = columnName;
             this.columnType = columnType;
             this.columnType = columnType;
             this.nullable = nullable;
             this.nullable = nullable;
@@ -121,12 +121,12 @@ public class TableMeta {
             this.defaultValue = defaultValue;
             this.defaultValue = defaultValue;
         }
         }
 
 
-        private String columnName;
-        private String columnType;
+        private String  columnName;
+        private String  columnType;
         private boolean nullable;
         private boolean nullable;
         private boolean key;
         private boolean key;
-        private String defaultValue;
-        private String extra;
+        private String  defaultValue;
+        private String  extra;
 
 
         public String getColumnName() {
         public String getColumnName() {
             return columnName;
             return columnName;
@@ -182,7 +182,7 @@ public class TableMeta {
 
 
         public String toString() {
         public String toString() {
             return "FieldMeta [columnName=" + columnName + ", columnType=" + columnType + ", defaultValue="
             return "FieldMeta [columnName=" + columnName + ", columnType=" + columnType + ", defaultValue="
-                + defaultValue + ", nullable=" + nullable + ", key=" + key + "]";
+                   + defaultValue + ", nullable=" + nullable + ", key=" + key + "]";
         }
         }
     }
     }
 
 

+ 4 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -30,8 +30,8 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     protected boolean           filterTableError        = false;
     protected boolean           filterTableError        = false;
 
 
     @Resource
     @Resource
-    protected TableMetaManager tableMetaManager;
-    protected boolean useDruidDdlFilter = true;
+    protected TableMetaManager  tableMetaManager;
+    protected boolean           useDruidDdlFilter       = true;
 
 
     protected BinlogParser buildParser() {
     protected BinlogParser buildParser() {
         LogEventConvert convert = new LogEventConvert();
         LogEventConvert convert = new LogEventConvert();
@@ -50,7 +50,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         convert.setFilterRows(filterRows);
         convert.setFilterRows(filterRows);
         convert.setFilterTableError(filterTableError);
         convert.setFilterTableError(filterTableError);
 
 
-        //初始化parser的时候也初始化管理mysql 表结构的管理器
+        // 初始化parser的时候也初始化管理mysql 表结构的管理器
         tableMetaManager.init();
         tableMetaManager.init();
         return convert;
         return convert;
     }
     }
@@ -66,6 +66,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
 
     /**
     /**
      * 回滚到指定位点
      * 回滚到指定位点
+     * 
      * @param position
      * @param position
      * @return
      * @return
      */
      */
@@ -77,7 +78,6 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         return true;
         return true;
     }
     }
 
 
-
     public void setEventBlackFilter(CanalEventFilter eventBlackFilter) {
     public void setEventBlackFilter(CanalEventFilter eventBlackFilter) {
         super.setEventBlackFilter(eventBlackFilter);
         super.setEventBlackFilter(eventBlackFilter);
 
 

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java

@@ -50,7 +50,7 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
             throw new CanalParseException(e);
             throw new CanalParseException(e);
         }
         }
 
 
-        tableMetaCache = new TableMetaCache(metaConnection,tableMetaManager);
+        tableMetaCache = new TableMetaCache(metaConnection, tableMetaManager);
         ((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
         ((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
     }
     }
 
 

+ 11 - 63
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -3,14 +3,8 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.nio.charset.Charset;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.List;
 import java.util.List;
-import java.util.Properties;
 
 
-import com.mysql.jdbc.Driver;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
@@ -27,28 +21,24 @@ import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
-
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 
 
 public class MysqlConnection implements ErosaConnection {
 public class MysqlConnection implements ErosaConnection {
 
 
-    private static final Logger logger  = LoggerFactory.getLogger(MysqlConnection.class);
+    private static final Logger logger      = LoggerFactory.getLogger(MysqlConnection.class);
 
 
     private MysqlConnector      connector;
     private MysqlConnector      connector;
     private long                slaveId;
     private long                slaveId;
-    private Charset             charset = Charset.forName("UTF-8");
+    private Charset             charset     = Charset.forName("UTF-8");
     private BinlogFormat        binlogFormat;
     private BinlogFormat        binlogFormat;
     private BinlogImage         binlogImage;
     private BinlogImage         binlogImage;
 
 
     // tsdb releated
     // tsdb releated
-    private AuthenticationInfo authInfo;
-    private Connection conn;
-    protected int connTimeout = 5 * 1000; // 5秒
-    protected int soTimeout = 60 * 60 * 1000; // 1小时
-    protected int bufferSize = 16 * 1024;
-
+    private AuthenticationInfo  authInfo;
+    protected int               connTimeout = 5 * 1000;                                      // 5秒
+    protected int               soTimeout   = 60 * 60 * 1000;                                // 1小时
 
 
     public MysqlConnection(){
     public MysqlConnection(){
     }
     }
@@ -59,7 +49,7 @@ public class MysqlConnection implements ErosaConnection {
         authInfo.setUsername(username);
         authInfo.setUsername(username);
         authInfo.setPassword(password);
         authInfo.setPassword(password);
         connector = new MysqlConnector(address, username, password);
         connector = new MysqlConnector(address, username, password);
-        //将connection里面的参数透传下
+        // 将connection里面的参数透传下
         connector.setSoTimeout(soTimeout);
         connector.setSoTimeout(soTimeout);
         connector.setConnTimeout(connTimeout);
         connector.setConnTimeout(connTimeout);
     }
     }
@@ -72,29 +62,13 @@ public class MysqlConnection implements ErosaConnection {
         authInfo.setPassword(password);
         authInfo.setPassword(password);
         authInfo.setDefaultDatabaseName(defaultSchema);
         authInfo.setDefaultDatabaseName(defaultSchema);
         connector = new MysqlConnector(address, username, password, charsetNumber, defaultSchema);
         connector = new MysqlConnector(address, username, password, charsetNumber, defaultSchema);
-        //将connection里面的参数透传下
+        // 将connection里面的参数透传下
         connector.setSoTimeout(soTimeout);
         connector.setSoTimeout(soTimeout);
         connector.setConnTimeout(connTimeout);
         connector.setConnTimeout(connTimeout);
     }
     }
 
 
     public void connect() throws IOException {
     public void connect() throws IOException {
         connector.connect();
         connector.connect();
-        //准备一个connection连接给tsdb查询用
-        Properties info = new Properties();
-        info.put("user", authInfo.getUsername());
-        info.put("password", authInfo.getPassword());
-        info.put("connectTimeout", String.valueOf(connTimeout));
-        info.put("socketTimeout", String.valueOf(soTimeout));
-        String url = "jdbc:mysql://" + authInfo.getAddress().getHostName() + ":"
-            + String.valueOf(authInfo.getAddress().getPort()) + "?allowMultiQueries=true";
-        try {
-            Driver driver = new com.mysql.jdbc.Driver();
-            conn = driver.connect(url, info);
-            // conn = DriverManager.getConnection(url, info);
-        } catch (SQLException e) {
-            throw new CanalParseException(e);
-        }
-
     }
     }
 
 
     public void reconnect() throws IOException {
     public void reconnect() throws IOException {
@@ -114,23 +88,9 @@ public class MysqlConnection implements ErosaConnection {
         return exector.query(cmd);
         return exector.query(cmd);
     }
     }
 
 
-    public <T> T query(String sql, ProcessJdbcResult<T> processor) {
-        Statement stmt = null;
-        try {
-            stmt = conn.createStatement();
-            ResultSet rs = stmt.executeQuery(sql);
-            return processor.process(rs);
-        } catch (SQLException e) {
-            throw new CanalParseException(e);
-        } finally {
-            if (stmt != null) {
-                try {
-                    stmt.close();
-                } catch (SQLException e) {
-                    // ignore
-                }
-            }
-        }
+    public List<ResultSetPacket> queryMulti(String cmd) throws IOException {
+        MysqlQueryExecutor exector = new MysqlQueryExecutor(connector);
+        return exector.queryMulti(cmd);
     }
     }
 
 
     public void update(String cmd) throws IOException {
     public void update(String cmd) throws IOException {
@@ -212,7 +172,7 @@ public class MysqlConnection implements ErosaConnection {
         connection.setCharset(getCharset());
         connection.setCharset(getCharset());
         connection.setSlaveId(getSlaveId());
         connection.setSlaveId(getSlaveId());
         connection.setConnector(connector.fork());
         connection.setConnector(connector.fork());
-        //set authInfo
+        // set authInfo
         connection.setAuthInfo(authInfo);
         connection.setAuthInfo(authInfo);
         return connection;
         return connection;
     }
     }
@@ -449,9 +409,6 @@ public class MysqlConnection implements ErosaConnection {
         return binlogImage;
         return binlogImage;
     }
     }
 
 
-    public Connection getConn() {
-        return conn;
-    }
     public InetSocketAddress getAddress() {
     public InetSocketAddress getAddress() {
         return authInfo.getAddress();
         return authInfo.getAddress();
     }
     }
@@ -464,10 +421,6 @@ public class MysqlConnection implements ErosaConnection {
         this.soTimeout = soTimeout;
         this.soTimeout = soTimeout;
     }
     }
 
 
-    public void setBufferSize(int bufferSize) {
-        this.bufferSize = bufferSize;
-    }
-
     public AuthenticationInfo getAuthInfo() {
     public AuthenticationInfo getAuthInfo() {
         return authInfo;
         return authInfo;
     }
     }
@@ -475,9 +428,4 @@ public class MysqlConnection implements ErosaConnection {
     public void setAuthInfo(AuthenticationInfo authInfo) {
     public void setAuthInfo(AuthenticationInfo authInfo) {
         this.authInfo = authInfo;
         this.authInfo = authInfo;
     }
     }
-
-    public static interface ProcessJdbcResult<T> {
-
-        public T process(ResultSet rs) throws SQLException;
-    }
 }
 }

+ 9 - 11
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -68,7 +68,6 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
 
 
     // update by yishun.chen,特殊异常处理参数
     // update by yishun.chen,特殊异常处理参数
     private int                dumpErrorCount                    = 0;        // binlogDump失败异常计数
     private int                dumpErrorCount                    = 0;        // binlogDump失败异常计数
-    private int dumpTimeoutCount = 0;// socketTimeout异常
     private int                dumpErrorCountThreshold           = 2;        // binlogDump失败异常计数阀值
     private int                dumpErrorCountThreshold           = 2;        // binlogDump失败异常计数阀值
 
 
     protected ErosaConnection buildErosaConnection() {
     protected ErosaConnection buildErosaConnection() {
@@ -118,12 +117,12 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 }
                 }
             }
             }
 
 
-            if(tableMetaManager != null){
+            if (tableMetaManager != null) {
                 tableMetaManager.setConnection(metaConnection);
                 tableMetaManager.setConnection(metaConnection);
                 tableMetaManager.setFilter(eventFilter);
                 tableMetaManager.setFilter(eventFilter);
             }
             }
 
 
-            tableMetaCache = new TableMetaCache(metaConnection,tableMetaManager);
+            tableMetaCache = new TableMetaCache(metaConnection, tableMetaManager);
             ((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
             ((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
         }
         }
     }
     }
@@ -443,7 +442,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
 
 
             public boolean sink(LogEvent event) {
             public boolean sink(LogEvent event) {
                 try {
                 try {
-                    CanalEntry.Entry entry = parseAndProfilingIfNecessary(event,true);
+                    CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, true);
                     if (entry == null) {
                     if (entry == null) {
                         return true;
                         return true;
                     }
                     }
@@ -476,7 +475,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
 
 
                 public boolean sink(LogEvent event) {
                 public boolean sink(LogEvent event) {
                     try {
                     try {
-                        CanalEntry.Entry entry = parseAndProfilingIfNecessary(event,true);
+                        CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, true);
                         if (entry == null) {
                         if (entry == null) {
                             return true;
                             return true;
                         }
                         }
@@ -555,7 +554,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 }
                 }
             } catch (Exception e) {
             } catch (Exception e) {
                 logger.warn(String.format("the binlogfile:%s doesn't exist, to continue to search the next binlogfile , caused by",
                 logger.warn(String.format("the binlogfile:%s doesn't exist, to continue to search the next binlogfile , caused by",
-                    startSearchBinlogFile), e);
+                    startSearchBinlogFile),
+                    e);
                 int binlogSeqNum = Integer.parseInt(startSearchBinlogFile.substring(startSearchBinlogFile.indexOf(".") + 1));
                 int binlogSeqNum = Integer.parseInt(startSearchBinlogFile.substring(startSearchBinlogFile.indexOf(".") + 1));
                 if (binlogSeqNum <= 1) {
                 if (binlogSeqNum <= 1) {
                     logger.warn("Didn't find the corresponding binlog files");
                     logger.warn("Didn't find the corresponding binlog files");
@@ -687,7 +687,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 public boolean sink(LogEvent event) {
                 public boolean sink(LogEvent event) {
                     EntryPosition entryPosition = null;
                     EntryPosition entryPosition = null;
                     try {
                     try {
-                        CanalEntry.Entry entry = parseAndProfilingIfNecessary(event,true);
+                        CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, true);
                         if (entry == null) {
                         if (entry == null) {
                             return true;
                             return true;
                         }
                         }
@@ -707,7 +707,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         }
                         }
 
 
                         if (StringUtils.equals(endPosition.getJournalName(), logfilename)
                         if (StringUtils.equals(endPosition.getJournalName(), logfilename)
-                            && endPosition.getPosition() <= (logfileoffset + event.getEventLen())) {
+                            && endPosition.getPosition() <= logfileoffset) {
                             return false;
                             return false;
                         }
                         }
 
 
@@ -715,9 +715,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         // position = current +
                         // position = current +
                         // data.length,代表该事务的下一条offest,避免多余的事务重复
                         // data.length,代表该事务的下一条offest,避免多余的事务重复
                         if (CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
                         if (CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
-                            entryPosition = new EntryPosition(logfilename,
-                                logfileoffset + event.getEventLen(),
-                                logposTimestamp);
+                            entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp);
                             logger.debug("set {} to be pending start position before finding another proper one...",
                             logger.debug("set {} to be pending start position before finding another proper one...",
                                 entryPosition);
                                 entryPosition);
                             logPosition.setPostion(entryPosition);
                             logPosition.setPostion(entryPosition);

+ 0 - 77
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DBMSAction.java

@@ -1,77 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.dbsync;
-
-/**
- * Defines database change action types: INSERT, UPDATE, DELETE, OTHER.
- *
- * @author <a href="mailto:changyuan.lh@taobao.com">Changyuan.lh</a>
- * @version 1.0
- */
-public enum DBMSAction {
-
-    INSERT('I'), UPDATE('U'), DELETE('D'), REPLACE('R'), OTHER('O'), CREATE('C'), ALTER('A'), ERASE('E'), QUERY('Q'),
-    ROWQUERY('W'), TRUNCATE('T'), CINDEX('X'), DINDEX('Y'), RENAME('Z');
-
-    protected final byte bValue;
-
-    DBMSAction(char ch){
-        this.bValue = (byte) ch;
-    }
-
-    /**
-     * Return action type from byte value.
-     */
-    public static DBMSAction fromValue(int iValue) {
-        switch ((char) iValue) {
-            case 'I':
-            case 'M': // MERGE (Oracle only)
-                return INSERT;
-            case 'U':
-                return UPDATE;
-            case 'D': // DELETE
-                return DELETE;
-            case 'R': // REPLACE
-                return REPLACE;
-            case 'C':
-                return CREATE;
-            case 'A':
-                return ALTER;
-            case 'E':
-                return ERASE;
-            case 'Q':
-                return QUERY;
-            case 'W':
-                return ROWQUERY;
-            case 'T':
-                return TRUNCATE;
-            case 'X':
-                return CINDEX;
-            case 'Y':
-                return DINDEX;
-            case 'Z':
-                return RENAME;
-        }
-        return OTHER;
-    }
-
-    /**
-     * Return action type from query.
-     */
-    public static DBMSAction fromQuery(String query) {
-        int length = query.length();
-        for (int index = 0; index < length; index++) {
-            char ch = query.charAt(index);
-            if (!Character.isWhitespace(ch)) {
-                return DBMSAction.fromValue(ch);
-            }
-        }
-
-        return OTHER;
-    }
-
-    /**
-     * Return byte value of action type.
-     */
-    public byte value() {
-        return bValue;
-    }
-}

+ 121 - 89
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -9,7 +9,6 @@ import java.sql.Types;
 import java.util.BitSet;
 import java.util.BitSet;
 import java.util.List;
 import java.util.List;
 
 
-import com.taobao.tddl.dbsync.binlog.BinlogPosition;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -22,7 +21,9 @@ import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
 import com.alibaba.otter.canal.parse.inbound.TableMeta;
 import com.alibaba.otter.canal.parse.inbound.TableMeta;
 import com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta;
 import com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta;
-import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.SimpleDdlParser.DdlResult;
+import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DdlResult;
+import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DruidDdlParser;
+import com.alibaba.otter.canal.parse.inbound.mysql.ddl.SimpleDdlParser;
 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
@@ -35,6 +36,7 @@ import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
 import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
 import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
 import com.alibaba.otter.canal.protocol.CanalEntry.Type;
 import com.alibaba.otter.canal.protocol.CanalEntry.Type;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString;
+import com.taobao.tddl.dbsync.binlog.BinlogPosition;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.IntvarLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.IntvarLogEvent;
@@ -87,6 +89,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     private boolean                     filterTableError    = false;
     private boolean                     filterTableError    = false;
     // 新增rows过滤,用于仅订阅除rows以外的数据
     // 新增rows过滤,用于仅订阅除rows以外的数据
     private boolean                     filterRows          = false;
     private boolean                     filterRows          = false;
+    private boolean                     useDruidDdlFilter   = true;
 
 
     @Override
     @Override
     public Entry parse(LogEvent logEvent, boolean isSeek) throws CanalParseException {
     public Entry parse(LogEvent logEvent, boolean isSeek) throws CanalParseException {
@@ -100,7 +103,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 binlogFileName = ((RotateLogEvent) logEvent).getFilename();
                 binlogFileName = ((RotateLogEvent) logEvent).getFilename();
                 break;
                 break;
             case LogEvent.QUERY_EVENT:
             case LogEvent.QUERY_EVENT:
-                return parseQueryEvent((QueryLogEvent) logEvent,isSeek);
+                return parseQueryEvent((QueryLogEvent) logEvent, isSeek);
             case LogEvent.XID_EVENT:
             case LogEvent.XID_EVENT:
                 return parseXidEvent((XidLogEvent) logEvent);
                 return parseXidEvent((XidLogEvent) logEvent);
             case LogEvent.TABLE_MAP_EVENT:
             case LogEvent.TABLE_MAP_EVENT:
@@ -150,110 +153,139 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
             Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
             return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
             return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
         } else {
         } else {
-            // DDL语句处理
-            DdlResult result = SimpleDdlParser.parse(queryString, event.getDbName());
-
-            String schemaName = event.getDbName();
-            if (StringUtils.isNotEmpty(result.getSchemaName())) {
-                schemaName = result.getSchemaName();
-            }
-
-            String tableName = result.getTableName();
+            boolean notFilter = false;
             EventType type = EventType.QUERY;
             EventType type = EventType.QUERY;
-            // fixed issue https://github.com/alibaba/canal/issues/58
-            if (result.getType() == EventType.ALTER || result.getType() == EventType.ERASE
-                || result.getType() == EventType.CREATE || result.getType() == EventType.TRUNCATE
-                || result.getType() == EventType.RENAME || result.getType() == EventType.CINDEX
-                || result.getType() == EventType.DINDEX) { // 针对DDL类型
-
-                if (filterQueryDdl) {
-                    return null;
+            String tableName = null;
+            String schemaName = null;
+            if (useDruidDdlFilter) {
+                List<DdlResult> results = DruidDdlParser.parse(queryString, event.getDbName());
+                for (DdlResult result : results) {
+                    if (!processFilter(queryString, result)) {
+                        // 只要有一个数据不进行过滤
+                        notFilter = true;
+                    }
+                }
+                if (results.size() > 0) {
+                    // 如果针对多行的DDL,只能取第一条
+                    type = results.get(0).getType();
+                    schemaName = results.get(0).getSchemaName();
+                    tableName = results.get(0).getTableName();
+                }
+            } else {
+                DdlResult result = SimpleDdlParser.parse(queryString, event.getDbName());
+                if (!processFilter(queryString, result)) {
+                    notFilter = true;
                 }
                 }
 
 
                 type = result.getType();
                 type = result.getType();
-                if (StringUtils.isEmpty(tableName)
-                    || (result.getType() == EventType.RENAME && StringUtils.isEmpty(result.getOriTableName()))) {
-                    // 如果解析不出tableName,记录一下日志,方便bugfix,目前直接抛出异常,中断解析
-                    throw new CanalParseException("SimpleDdlParser process query failed. pls submit issue with this queryString: "
-                                                  + queryString + " , and DdlResult: " + result.toString());
-                    // return null;
-                } else {
-                    // check name filter
-                    String name = schemaName + "." + tableName;
-                    if (nameFilter != null && !nameFilter.filter(name)) {
-                        if (result.getType() == EventType.RENAME) {
-                            // rename校验只要源和目标满足一个就进行操作
-                            if (nameFilter != null
-                                && !nameFilter.filter(result.getOriSchemaName() + "." + result.getOriTableName())) {
-                                return null;
-                            }
-                        } else {
-                            // 其他情况返回null
-                            return null;
-                        }
-                    }
+                schemaName = result.getSchemaName();
+                tableName = result.getTableName();
+            }
 
 
-                    if (nameBlackFilter != null && nameBlackFilter.filter(name)) {
-                        if (result.getType() == EventType.RENAME) {
-                            // rename校验只要源和目标满足一个就进行操作
-                            if (nameBlackFilter != null
-                                && nameBlackFilter.filter(result.getOriSchemaName() + "." + result.getOriTableName())) {
-                                return null;
-                            }
-                        } else {
-                            // 其他情况返回null
-                            return null;
-                        }
-                    }
-                }
-            } else if (result.getType() == EventType.INSERT || result.getType() == EventType.UPDATE
-                       || result.getType() == EventType.DELETE) {
-                // 对外返回,保证兼容,还是返回QUERY类型,这里暂不解析tableName,所以无法支持过滤
-                if (filterQueryDml) {
-                    return null;
-                }
-            } else if (filterQueryDcl) {
+            if (!notFilter) {
+                // 如果是过滤的数据就不处理了
                 return null;
                 return null;
             }
             }
 
 
-            // 更新下table meta cache
-            //if (tableMetaCache != null
-            //    && (result.getType() == EventType.ALTER || result.getType() == EventType.ERASE || result.getType() == EventType.RENAME)) {
-            //    for (DdlResult renameResult = result; renameResult != null; renameResult = renameResult.getRenameTableResult()) {
-            //        String schemaName0 = event.getDbName(); // 防止rename语句后产生schema变更带来影响
-            //        if (StringUtils.isNotEmpty(renameResult.getSchemaName())) {
-            //            schemaName0 = renameResult.getSchemaName();
-            //        }
-            //
-            //        tableName = renameResult.getTableName();
-            //        if (StringUtils.isNotEmpty(tableName)) {
-            //            // 如果解析到了正确的表信息,则根据全名进行清除
-            //            tableMetaCache.clearTableMeta(schemaName0, tableName);
-            //        } else {
-            //            // 如果无法解析正确的表信息,则根据schema进行清除
-            //            tableMetaCache.clearTableMetaWithSchemaName(schemaName0);
-            //        }
-            //    }
-            //}
-
-            //使用新的表结构元数据管理方式
-            BinlogPosition position = createPosition(event.getHeader());
-            tableMetaCache.apply(position, event.getDbName(), queryString);
+            if (!isSeek) {
+                // 使用新的表结构元数据管理方式
+                BinlogPosition position = createPosition(event.getHeader());
+                tableMetaCache.apply(position, event.getDbName(), queryString);
+            }
 
 
             Header header = createHeader(binlogFileName, event.getHeader(), schemaName, tableName, type);
             Header header = createHeader(binlogFileName, event.getHeader(), schemaName, tableName, type);
             RowChange.Builder rowChangeBuider = RowChange.newBuilder();
             RowChange.Builder rowChangeBuider = RowChange.newBuilder();
-            if (result.getType() != EventType.QUERY) {
+            if (type != EventType.QUERY) {
                 rowChangeBuider.setIsDdl(true);
                 rowChangeBuider.setIsDdl(true);
             }
             }
             rowChangeBuider.setSql(queryString);
             rowChangeBuider.setSql(queryString);
             if (StringUtils.isNotEmpty(event.getDbName())) {// 可能为空
             if (StringUtils.isNotEmpty(event.getDbName())) {// 可能为空
                 rowChangeBuider.setDdlSchemaName(event.getDbName());
                 rowChangeBuider.setDdlSchemaName(event.getDbName());
             }
             }
-            rowChangeBuider.setEventType(result.getType());
+            rowChangeBuider.setEventType(type);
             return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
             return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
         }
         }
     }
     }
 
 
+    private boolean processFilter(String queryString, DdlResult result) {
+        String schemaName = result.getSchemaName();
+        String tableName = result.getTableName();
+        // fixed issue https://github.com/alibaba/canal/issues/58
+        // 更新下table meta cache
+        if (tableMetaCache != null
+            && (result.getType() == EventType.ALTER || result.getType() == EventType.ERASE || result.getType() == EventType.RENAME)) {
+            // 对外返回,保证兼容,还是返回QUERY类型,这里暂不解析tableName,所以无法支持过滤
+            for (DdlResult renameResult = result; renameResult != null; renameResult = renameResult.getRenameTableResult()) {
+                String schemaName0 = renameResult.getSchemaName();
+                String tableName0 = renameResult.getTableName();
+                if (StringUtils.isNotEmpty(tableName0)) {
+                    // 如果解析到了正确的表信息,则根据全名进行清除
+                    tableMetaCache.clearTableMeta(schemaName0, tableName0);
+                } else {
+                    // 如果无法解析正确的表信息,则根据schema进行清除
+                    tableMetaCache.clearTableMetaWithSchemaName(schemaName0);
+                }
+            }
+        }
+
+        // fixed issue https://github.com/alibaba/canal/issues/58
+        if (result.getType() == EventType.ALTER || result.getType() == EventType.ERASE
+            || result.getType() == EventType.CREATE || result.getType() == EventType.TRUNCATE
+            || result.getType() == EventType.RENAME || result.getType() == EventType.CINDEX
+            || result.getType() == EventType.DINDEX) { // 针对DDL类型
+
+            if (filterQueryDdl) {
+                return true;
+            }
+
+            if (StringUtils.isEmpty(tableName)
+                || (result.getType() == EventType.RENAME && StringUtils.isEmpty(result.getOriTableName()))) {
+                // 如果解析不出tableName,记录一下日志,方便bugfix,目前直接抛出异常,中断解析
+                throw new CanalParseException("SimpleDdlParser process query failed. pls submit issue with this queryString: "
+                                              + queryString + " , and DdlResult: " + result.toString());
+                // return null;
+            } else {
+                // check name filter
+                String name = schemaName + "." + tableName;
+                if (nameFilter != null && !nameFilter.filter(name)) {
+                    if (result.getType() == EventType.RENAME) {
+                        // rename校验只要源和目标满足一个就进行操作
+                        if (nameFilter != null
+                            && !nameFilter.filter(result.getOriSchemaName() + "." + result.getOriTableName())) {
+                            return true;
+                        }
+                    } else {
+                        // 其他情况返回null
+                        return true;
+                    }
+                }
+
+                if (nameBlackFilter != null && nameBlackFilter.filter(name)) {
+                    if (result.getType() == EventType.RENAME) {
+                        // rename校验只要源和目标满足一个就进行操作
+                        if (nameBlackFilter != null
+                            && nameBlackFilter.filter(result.getOriSchemaName() + "." + result.getOriTableName())) {
+                            return true;
+                        }
+                    } else {
+                        // 其他情况返回null
+                        return true;
+                    }
+                }
+            }
+        } else if (result.getType() == EventType.INSERT || result.getType() == EventType.UPDATE
+                   || result.getType() == EventType.DELETE) {
+            // 对外返回,保证兼容,还是返回QUERY类型,这里暂不解析tableName,所以无法支持过滤
+            if (filterQueryDml) {
+                return true;
+            }
+        } else if (filterQueryDcl) {
+            return true;
+        }
+
+        return false;
+    }
+
     private Entry parseRowsQueryEvent(RowsQueryLogEvent event) {
     private Entry parseRowsQueryEvent(RowsQueryLogEvent event) {
         if (filterQueryDml) {
         if (filterQueryDml) {
             return null;
             return null;
@@ -366,7 +398,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             boolean tableError = false;
             boolean tableError = false;
             TableMeta tableMeta = null;
             TableMeta tableMeta = null;
             if (tableMetaCache != null) {// 入错存在table meta cache
             if (tableMetaCache != null) {// 入错存在table meta cache
-                tableMeta = getTableMeta(table.getDbName(), table.getTableName(), true,position);
+                tableMeta = getTableMeta(table.getDbName(), table.getTableName(), true, position);
                 if (tableMeta == null) {
                 if (tableMeta == null) {
                     tableError = true;
                     tableError = true;
                     if (!filterTableError) {
                     if (!filterTableError) {
@@ -444,7 +476,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 // 3. 锁住应用请求,将临时表rename为老表的名字,完成增加字段的操作
                 // 3. 锁住应用请求,将临时表rename为老表的名字,完成增加字段的操作
                 // 尝试做一次reload,可能因为ddl没有正确解析,或者使用了类似online ddl的操作
                 // 尝试做一次reload,可能因为ddl没有正确解析,或者使用了类似online ddl的操作
                 // 因为online ddl没有对应表名的alter语法,所以不会有clear cache的操作
                 // 因为online ddl没有对应表名的alter语法,所以不会有clear cache的操作
-                tableMeta = getTableMeta(event.getTable().getDbName(), event.getTable().getTableName(), false,position);// 强制重新获取一次
+                tableMeta = getTableMeta(event.getTable().getDbName(), event.getTable().getTableName(), false, position);// 强制重新获取一次
                 if (tableMeta == null) {
                 if (tableMeta == null) {
                     tableError = true;
                     tableError = true;
                     if (!filterTableError) {
                     if (!filterTableError) {
@@ -686,7 +718,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
 
     private TableMeta getTableMeta(String dbName, String tbName, boolean useCache, BinlogPosition position) {
     private TableMeta getTableMeta(String dbName, String tbName, boolean useCache, BinlogPosition position) {
         try {
         try {
-            return tableMetaCache.getTableMeta(dbName, tbName, useCache,position);
+            return tableMetaCache.getTableMeta(dbName, tbName, useCache, position);
         } catch (Exception e) {
         } catch (Exception e) {
             String message = ExceptionUtils.getRootCauseMessage(e);
             String message = ExceptionUtils.getRootCauseMessage(e);
             if (filterTableError) {
             if (filterTableError) {

+ 86 - 79
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java

@@ -1,24 +1,24 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.dbsync;
 package com.alibaba.otter.canal.parse.inbound.mysql.dbsync;
 
 
 import java.io.IOException;
 import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.taobao.tddl.dbsync.binlog.BinlogPosition;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.TableMeta;
 import com.alibaba.otter.canal.parse.inbound.TableMeta;
 import com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta;
 import com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
-import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.ProcessJdbcResult;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.taobao.tddl.dbsync.binlog.BinlogPosition;
 
 
 /**
 /**
  * 处理table meta解析和缓存
  * 处理table meta解析和缓存
@@ -28,57 +28,23 @@ import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager;
  */
  */
 public class TableMetaCache {
 public class TableMetaCache {
 
 
-    public static final String COLUMN_NAME = "COLUMN_NAME";
-    public static final String COLUMN_TYPE = "COLUMN_TYPE";
-    public static final String IS_NULLABLE = "IS_NULLABLE";
-    public static final String COLUMN_KEY = "COLUMN_KEY";
-    public static final String COLUMN_DEFAULT = "COLUMN_DEFAULT";
-    public static final String EXTRA = "EXTRA";
-    private MysqlConnection connection;
-    private boolean isOnRDS = false;
+    public static final String              COLUMN_NAME    = "COLUMN_NAME";
+    public static final String              COLUMN_TYPE    = "COLUMN_TYPE";
+    public static final String              IS_NULLABLE    = "IS_NULLABLE";
+    public static final String              COLUMN_KEY     = "COLUMN_KEY";
+    public static final String              COLUMN_DEFAULT = "COLUMN_DEFAULT";
+    public static final String              EXTRA          = "EXTRA";
+    private MysqlConnection                 connection;
+    private boolean                         isOnRDS        = false;
 
 
-    private TableMetaManager tableMetaManager;
+    private TableMetaManager                tableMetaManager;
     // 第一层tableId,第二层schema.table,解决tableId重复,对应多张表
     // 第一层tableId,第二层schema.table,解决tableId重复,对应多张表
     private LoadingCache<String, TableMeta> tableMetaDB;
     private LoadingCache<String, TableMeta> tableMetaDB;
 
 
-
-    // 第一层tableId,第二层schema.table,解决tableId重复,对应多张表
-    private Map<String, TableMeta> tableMetaCache;
-
-    /**
-     * 从db获取表结构
-     * @param fullname
-     * @return
-     */
-    private TableMeta getTableMetaByDB(final String fullname) {
-        return connection.query("desc " + fullname, new ProcessJdbcResult<TableMeta>() {
-
-            @Override
-            public TableMeta process(ResultSet rs) throws SQLException {
-                List<FieldMeta> metas = new ArrayList<FieldMeta>();
-                while (rs.next()) {
-                    FieldMeta meta = new FieldMeta();
-                    // 做一个优化,使用String.intern(),共享String对象,减少内存使用
-                    meta.setColumnName(rs.getString("Field"));
-                    meta.setColumnType(rs.getString("Type"));
-                    meta.setNullable(StringUtils.equalsIgnoreCase(rs.getString("Null"), "YES"));
-                    meta.setKey("PRI".equalsIgnoreCase(rs.getString("Key")));
-                    meta.setDefaultValue(rs.getString("Default"));
-                    metas.add(meta);
-                }
-
-                String[] names = StringUtils.split(fullname, "`.`");
-                String schema = names[0];
-                String table = names[1].substring(0, names[1].length());
-                return new TableMeta(schema, table, metas);
-            }
-        });
-    }
-
-    public TableMetaCache(MysqlConnection con,TableMetaManager tableMetaManager) {
+    public TableMetaCache(MysqlConnection con, TableMetaManager tableMetaManager){
         this.connection = con;
         this.connection = con;
         this.tableMetaManager = tableMetaManager;
         this.tableMetaManager = tableMetaManager;
-        //如果持久存储的表结构为空,从db里面获取下
+        // 如果持久存储的表结构为空,从db里面获取下
         if (tableMetaManager == null) {
         if (tableMetaManager == null) {
             this.tableMetaDB = CacheBuilder.newBuilder().build(new CacheLoader<String, TableMeta>() {
             this.tableMetaDB = CacheBuilder.newBuilder().build(new CacheLoader<String, TableMeta>() {
 
 
@@ -100,16 +66,62 @@ public class TableMetaCache {
             });
             });
         }
         }
 
 
-        isOnRDS = connection.query("show global variables  like 'rds\\_%'", new ProcessJdbcResult<Boolean>() {
-
-            @Override
-            public Boolean process(ResultSet rs) throws SQLException {
-                if (rs.next()) {
-                    return true;
-                }
-                return false;
+        try {
+            ResultSetPacket packet = connection.query("show global variables  like 'rds\\_%'");
+            if (packet.getFieldValues().size() > 0) {
+                isOnRDS = true;
             }
             }
-        });
+        } catch (IOException e) {
+        }
+    }
+
+    private TableMeta getTableMetaByDB(String fullname) throws IOException {
+        ResultSetPacket packet = connection.query("desc " + fullname);
+        String[] names = StringUtils.split(fullname, "`.`");
+        String schema = names[0];
+        String table = names[1].substring(0, names[1].length());
+        return new TableMeta(schema, table, parserTableMeta(packet));
+    }
+
+    public static List<FieldMeta> parserTableMeta(ResultSetPacket packet) {
+        Map<String, Integer> nameMaps = new HashMap<String, Integer>(6, 1f);
+
+        int index = 0;
+        for (FieldPacket fieldPacket : packet.getFieldDescriptors()) {
+            nameMaps.put(fieldPacket.getOriginalName(), index++);
+        }
+
+        int size = packet.getFieldDescriptors().size();
+        int count = packet.getFieldValues().size() / packet.getFieldDescriptors().size();
+        List<FieldMeta> result = new ArrayList<FieldMeta>();
+        for (int i = 0; i < count; i++) {
+            FieldMeta meta = new FieldMeta();
+            // 做一个优化,使用String.intern(),共享String对象,减少内存使用
+            meta.setColumnName(packet.getFieldValues().get(nameMaps.get(COLUMN_NAME) + i * size).intern());
+            meta.setColumnType(packet.getFieldValues().get(nameMaps.get(COLUMN_TYPE) + i * size));
+            meta.setNullable(StringUtils.equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(IS_NULLABLE) + i
+                                                                                      * size),
+                "YES"));
+            meta.setKey("PRI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size)));
+            meta.setDefaultValue(packet.getFieldValues().get(nameMaps.get(COLUMN_DEFAULT) + i * size));
+            meta.setExtra(packet.getFieldValues().get(nameMaps.get(EXTRA) + i * size));
+
+            result.add(meta);
+        }
+
+        return result;
+    }
+
+    public TableMeta getTableMeta(String schema, String table) {
+        return getTableMeta(schema, table, true);
+    }
+
+    public TableMeta getTableMeta(String schema, String table, boolean useCache) {
+        if (!useCache) {
+            tableMetaDB.invalidate(getFullName(schema, table));
+        }
+
+        return tableMetaDB.getUnchecked(getFullName(schema, table));
     }
     }
 
 
     public TableMeta getTableMeta(String schema, String table, BinlogPosition position) {
     public TableMeta getTableMeta(String schema, String table, BinlogPosition position) {
@@ -122,20 +134,19 @@ public class TableMetaCache {
             tableMeta = tableMetaManager.find(schema, table);
             tableMeta = tableMetaManager.find(schema, table);
             if (tableMeta == null) {
             if (tableMeta == null) {
                 // 因为条件变化,可能第一次的tableMeta没取到,需要从db获取一次,并记录到snapshot中
                 // 因为条件变化,可能第一次的tableMeta没取到,需要从db获取一次,并记录到snapshot中
-                String createDDL = connection.query("show create table " + getFullName(schema, table),
-                    new ProcessJdbcResult<String>() {
-
-                        @Override
-                        public String process(ResultSet rs) throws SQLException {
-                            while (rs.next()) {
-                                return rs.getString(2);
-                            }
-                            return null;
-                        }
-                    });
-                // 强制覆盖掉内存值
-                tableMetaManager.apply(position, schema, createDDL);
-                tableMeta = tableMetaManager.find(schema, table);
+                String fullName = getFullName(schema, table);
+                try {
+                    ResultSetPacket packet = connection.query("show create table " + fullName);
+                    String createDDL = null;
+                    if (packet.getFieldValues().size() > 0) {
+                        createDDL = packet.getFieldValues().get(1);
+                    }
+                    // 强制覆盖掉内存值
+                    tableMetaManager.apply(position, schema, createDDL);
+                    tableMeta = tableMetaManager.find(schema, table);
+                } catch (IOException e) {
+                    throw new CanalParseException("fetch failed by table meta:" + fullName, e);
+                }
             }
             }
             return tableMeta;
             return tableMeta;
         } else {
         } else {
@@ -147,8 +158,6 @@ public class TableMetaCache {
         }
         }
     }
     }
 
 
-
-
     public void clearTableMeta(String schema, String table) {
     public void clearTableMeta(String schema, String table) {
         if (tableMetaManager != null) {
         if (tableMetaManager != null) {
             // tsdb不需要做,会基于ddl sql自动清理
             // tsdb不需要做,会基于ddl sql自动清理
@@ -195,8 +204,6 @@ public class TableMetaCache {
         }
         }
     }
     }
 
 
-
-
     private String getFullName(String schema, String table) {
     private String getFullName(String schema, String table) {
         StringBuilder builder = new StringBuilder();
         StringBuilder builder = new StringBuilder();
         return builder.append('`')
         return builder.append('`')

+ 19 - 20
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/ddl/DdlResult.java

@@ -1,6 +1,6 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.ddl;
 package com.alibaba.otter.canal.parse.inbound.mysql.ddl;
 
 
-import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DBMSAction;
+import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
 
 
 /**
 /**
  * @author agapple 2017年8月1日 下午7:30:42
  * @author agapple 2017年8月1日 下午7:30:42
@@ -8,30 +8,30 @@ import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DBMSAction;
  */
  */
 public class DdlResult {
 public class DdlResult {
 
 
-    private String schemaName;
-    private String tableName;
-    private String oriSchemaName; // rename ddl中的源表
-    private String oriTableName; // rename ddl中的目标表
-    private DBMSAction type;
+    private String    schemaName;
+    private String    tableName;
+    private String    oriSchemaName;    // rename ddl中的源表
+    private String    oriTableName;     // rename ddl中的目标表
+    private EventType type;
     private DdlResult renameTableResult; // 多个rename table的存储
     private DdlResult renameTableResult; // 多个rename table的存储
 
 
     /*
     /*
      * RENAME TABLE tbl_name TO new_tbl_name [, tbl_name2 TO new_tbl_name2] ...
      * RENAME TABLE tbl_name TO new_tbl_name [, tbl_name2 TO new_tbl_name2] ...
      */
      */
 
 
-    public DdlResult() {
+    public DdlResult(){
     }
     }
 
 
-    public DdlResult(String schemaName) {
+    public DdlResult(String schemaName){
         this.schemaName = schemaName;
         this.schemaName = schemaName;
     }
     }
 
 
-    public DdlResult(String schemaName, String tableName) {
+    public DdlResult(String schemaName, String tableName){
         this.schemaName = schemaName;
         this.schemaName = schemaName;
         this.tableName = tableName;
         this.tableName = tableName;
     }
     }
 
 
-    public DdlResult(String schemaName, String tableName, String oriSchemaName, String oriTableName) {
+    public DdlResult(String schemaName, String tableName, String oriSchemaName, String oriTableName){
         this.schemaName = schemaName;
         this.schemaName = schemaName;
         this.tableName = tableName;
         this.tableName = tableName;
         this.oriSchemaName = oriSchemaName;
         this.oriSchemaName = oriSchemaName;
@@ -54,11 +54,11 @@ public class DdlResult {
         this.tableName = tableName;
         this.tableName = tableName;
     }
     }
 
 
-    public DBMSAction getType() {
+    public EventType getType() {
         return type;
         return type;
     }
     }
 
 
-    public void setType(DBMSAction type) {
+    public void setType(EventType type) {
         this.type = type;
         this.type = type;
     }
     }
 
 
@@ -93,7 +93,7 @@ public class DdlResult {
         result.setOriTableName(oriTableName);
         result.setOriTableName(oriTableName);
         result.setSchemaName(schemaName);
         result.setSchemaName(schemaName);
         result.setTableName(tableName);
         result.setTableName(tableName);
-        //result.setType(type);
+        // result.setType(type);
         return result;
         return result;
     }
     }
 
 
@@ -102,13 +102,12 @@ public class DdlResult {
         DdlResult ddlResult = this;
         DdlResult ddlResult = this;
         StringBuffer sb = new StringBuffer();
         StringBuffer sb = new StringBuffer();
         do {
         do {
-            sb.append(String
-                .format("DdlResult [schemaName=%s , tableName=%s , oriSchemaName=%s , oriTableName=%s , type=%s ];",
-                    ddlResult.schemaName,
-                    ddlResult.tableName,
-                    ddlResult.oriSchemaName,
-                    ddlResult.oriTableName,
-                    ddlResult.type));
+            sb.append(String.format("DdlResult [schemaName=%s , tableName=%s , oriSchemaName=%s , oriTableName=%s , type=%s ];",
+                ddlResult.schemaName,
+                ddlResult.tableName,
+                ddlResult.oriSchemaName,
+                ddlResult.oriTableName,
+                ddlResult.type));
             ddlResult = ddlResult.renameTableResult;
             ddlResult = ddlResult.renameTableResult;
         } while (ddlResult != null);
         } while (ddlResult != null);
         return sb.toString();
         return sb.toString();

+ 17 - 17
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/ddl/DruidDdlParser.java

@@ -33,11 +33,11 @@ import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlRenameTableStateme
 import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlRenameTableStatement.Item;
 import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlRenameTableStatement.Item;
 import com.alibaba.druid.sql.parser.ParserException;
 import com.alibaba.druid.sql.parser.ParserException;
 import com.alibaba.druid.util.JdbcConstants;
 import com.alibaba.druid.util.JdbcConstants;
-import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DBMSAction;
+import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
 
 
 /**
 /**
  * @author agapple 2017年7月27日 下午4:05:34
  * @author agapple 2017年7月27日 下午4:05:34
- * @since 3.2.5
+ * @since 1.0.25
  */
  */
 public class DruidDdlParser {
 public class DruidDdlParser {
 
 
@@ -48,7 +48,7 @@ public class DruidDdlParser {
         } catch (ParserException e) {
         } catch (ParserException e) {
             // 可能存在一些SQL是不支持的,比如存储过程
             // 可能存在一些SQL是不支持的,比如存储过程
             DdlResult ddlResult = new DdlResult();
             DdlResult ddlResult = new DdlResult();
-            ddlResult.setType(DBMSAction.QUERY);
+            ddlResult.setType(EventType.QUERY);
             return Arrays.asList(ddlResult);
             return Arrays.asList(ddlResult);
         }
         }
 
 
@@ -58,7 +58,7 @@ public class DruidDdlParser {
                 DdlResult ddlResult = new DdlResult();
                 DdlResult ddlResult = new DdlResult();
                 SQLCreateTableStatement createTable = (SQLCreateTableStatement) statement;
                 SQLCreateTableStatement createTable = (SQLCreateTableStatement) statement;
                 processName(ddlResult, schmeaName, createTable.getName(), false);
                 processName(ddlResult, schmeaName, createTable.getName(), false);
-                ddlResult.setType(DBMSAction.CREATE);
+                ddlResult.setType(EventType.CREATE);
                 ddlResults.add(ddlResult);
                 ddlResults.add(ddlResult);
             } else if (statement instanceof SQLAlterTableStatement) {
             } else if (statement instanceof SQLAlterTableStatement) {
                 SQLAlterTableStatement alterTable = (SQLAlterTableStatement) statement;
                 SQLAlterTableStatement alterTable = (SQLAlterTableStatement) statement;
@@ -71,30 +71,30 @@ public class DruidDdlParser {
                     } else if (item instanceof SQLAlterTableAddIndex) {
                     } else if (item instanceof SQLAlterTableAddIndex) {
                         DdlResult ddlResult = new DdlResult();
                         DdlResult ddlResult = new DdlResult();
                         processName(ddlResult, schmeaName, alterTable.getName(), false);
                         processName(ddlResult, schmeaName, alterTable.getName(), false);
-                        ddlResult.setType(DBMSAction.CINDEX);
+                        ddlResult.setType(EventType.CINDEX);
                         ddlResults.add(ddlResult);
                         ddlResults.add(ddlResult);
                     } else if (item instanceof SQLAlterTableDropIndex || item instanceof SQLAlterTableDropKey) {
                     } else if (item instanceof SQLAlterTableDropIndex || item instanceof SQLAlterTableDropKey) {
                         DdlResult ddlResult = new DdlResult();
                         DdlResult ddlResult = new DdlResult();
                         processName(ddlResult, schmeaName, alterTable.getName(), false);
                         processName(ddlResult, schmeaName, alterTable.getName(), false);
-                        ddlResult.setType(DBMSAction.DINDEX);
+                        ddlResult.setType(EventType.DINDEX);
                         ddlResults.add(ddlResult);
                         ddlResults.add(ddlResult);
                     } else if (item instanceof SQLAlterTableAddConstraint) {
                     } else if (item instanceof SQLAlterTableAddConstraint) {
                         DdlResult ddlResult = new DdlResult();
                         DdlResult ddlResult = new DdlResult();
                         processName(ddlResult, schmeaName, alterTable.getName(), false);
                         processName(ddlResult, schmeaName, alterTable.getName(), false);
                         SQLConstraint constraint = ((SQLAlterTableAddConstraint) item).getConstraint();
                         SQLConstraint constraint = ((SQLAlterTableAddConstraint) item).getConstraint();
                         if (constraint instanceof SQLUnique) {
                         if (constraint instanceof SQLUnique) {
-                            ddlResult.setType(DBMSAction.CINDEX);
+                            ddlResult.setType(EventType.CINDEX);
                             ddlResults.add(ddlResult);
                             ddlResults.add(ddlResult);
                         }
                         }
                     } else if (item instanceof SQLAlterTableDropConstraint) {
                     } else if (item instanceof SQLAlterTableDropConstraint) {
                         DdlResult ddlResult = new DdlResult();
                         DdlResult ddlResult = new DdlResult();
                         processName(ddlResult, schmeaName, alterTable.getName(), false);
                         processName(ddlResult, schmeaName, alterTable.getName(), false);
-                        ddlResult.setType(DBMSAction.DINDEX);
+                        ddlResult.setType(EventType.DINDEX);
                         ddlResults.add(ddlResult);
                         ddlResults.add(ddlResult);
                     } else {
                     } else {
                         DdlResult ddlResult = new DdlResult();
                         DdlResult ddlResult = new DdlResult();
                         processName(ddlResult, schmeaName, alterTable.getName(), false);
                         processName(ddlResult, schmeaName, alterTable.getName(), false);
-                        ddlResult.setType(DBMSAction.ALTER);
+                        ddlResult.setType(EventType.ALTER);
                         ddlResults.add(ddlResult);
                         ddlResults.add(ddlResult);
                     }
                     }
                 }
                 }
@@ -103,7 +103,7 @@ public class DruidDdlParser {
                 for (SQLExprTableSource tableSource : dropTable.getTableSources()) {
                 for (SQLExprTableSource tableSource : dropTable.getTableSources()) {
                     DdlResult ddlResult = new DdlResult();
                     DdlResult ddlResult = new DdlResult();
                     processName(ddlResult, schmeaName, tableSource.getExpr(), false);
                     processName(ddlResult, schmeaName, tableSource.getExpr(), false);
-                    ddlResult.setType(DBMSAction.ERASE);
+                    ddlResult.setType(EventType.ERASE);
                     ddlResults.add(ddlResult);
                     ddlResults.add(ddlResult);
                 }
                 }
             } else if (statement instanceof SQLCreateIndexStatement) {
             } else if (statement instanceof SQLCreateIndexStatement) {
@@ -111,21 +111,21 @@ public class DruidDdlParser {
                 SQLTableSource tableSource = createIndex.getTable();
                 SQLTableSource tableSource = createIndex.getTable();
                 DdlResult ddlResult = new DdlResult();
                 DdlResult ddlResult = new DdlResult();
                 processName(ddlResult, schmeaName, ((SQLExprTableSource) tableSource).getExpr(), false);
                 processName(ddlResult, schmeaName, ((SQLExprTableSource) tableSource).getExpr(), false);
-                ddlResult.setType(DBMSAction.CINDEX);
+                ddlResult.setType(EventType.CINDEX);
                 ddlResults.add(ddlResult);
                 ddlResults.add(ddlResult);
             } else if (statement instanceof SQLDropIndexStatement) {
             } else if (statement instanceof SQLDropIndexStatement) {
                 SQLDropIndexStatement dropIndex = (SQLDropIndexStatement) statement;
                 SQLDropIndexStatement dropIndex = (SQLDropIndexStatement) statement;
                 SQLExprTableSource tableSource = dropIndex.getTableName();
                 SQLExprTableSource tableSource = dropIndex.getTableName();
                 DdlResult ddlResult = new DdlResult();
                 DdlResult ddlResult = new DdlResult();
                 processName(ddlResult, schmeaName, tableSource.getExpr(), false);
                 processName(ddlResult, schmeaName, tableSource.getExpr(), false);
-                ddlResult.setType(DBMSAction.DINDEX);
+                ddlResult.setType(EventType.DINDEX);
                 ddlResults.add(ddlResult);
                 ddlResults.add(ddlResult);
             } else if (statement instanceof SQLTruncateStatement) {
             } else if (statement instanceof SQLTruncateStatement) {
                 SQLTruncateStatement truncate = (SQLTruncateStatement) statement;
                 SQLTruncateStatement truncate = (SQLTruncateStatement) statement;
                 for (SQLExprTableSource tableSource : truncate.getTableSources()) {
                 for (SQLExprTableSource tableSource : truncate.getTableSources()) {
                     DdlResult ddlResult = new DdlResult();
                     DdlResult ddlResult = new DdlResult();
                     processName(ddlResult, schmeaName, tableSource.getExpr(), false);
                     processName(ddlResult, schmeaName, tableSource.getExpr(), false);
-                    ddlResult.setType(DBMSAction.TRUNCATE);
+                    ddlResult.setType(EventType.TRUNCATE);
                     ddlResults.add(ddlResult);
                     ddlResults.add(ddlResult);
                 }
                 }
             } else if (statement instanceof MySqlRenameTableStatement) {
             } else if (statement instanceof MySqlRenameTableStatement) {
@@ -134,28 +134,28 @@ public class DruidDdlParser {
                     DdlResult ddlResult = new DdlResult();
                     DdlResult ddlResult = new DdlResult();
                     processName(ddlResult, schmeaName, item.getName(), true);
                     processName(ddlResult, schmeaName, item.getName(), true);
                     processName(ddlResult, schmeaName, item.getTo(), false);
                     processName(ddlResult, schmeaName, item.getTo(), false);
-                    ddlResult.setType(DBMSAction.RENAME);
+                    ddlResult.setType(EventType.RENAME);
                     ddlResults.add(ddlResult);
                     ddlResults.add(ddlResult);
                 }
                 }
             } else if (statement instanceof SQLInsertStatement) {
             } else if (statement instanceof SQLInsertStatement) {
                 DdlResult ddlResult = new DdlResult();
                 DdlResult ddlResult = new DdlResult();
                 SQLInsertStatement insert = (SQLInsertStatement) statement;
                 SQLInsertStatement insert = (SQLInsertStatement) statement;
                 processName(ddlResult, schmeaName, insert.getTableName(), true);
                 processName(ddlResult, schmeaName, insert.getTableName(), true);
-                ddlResult.setType(DBMSAction.INSERT);
+                ddlResult.setType(EventType.INSERT);
                 ddlResults.add(ddlResult);
                 ddlResults.add(ddlResult);
             } else if (statement instanceof SQLUpdateStatement) {
             } else if (statement instanceof SQLUpdateStatement) {
                 DdlResult ddlResult = new DdlResult();
                 DdlResult ddlResult = new DdlResult();
                 SQLUpdateStatement update = (SQLUpdateStatement) statement;
                 SQLUpdateStatement update = (SQLUpdateStatement) statement;
                 // 拿到的表名可能为null,比如update a,b set a.id=x
                 // 拿到的表名可能为null,比如update a,b set a.id=x
                 processName(ddlResult, schmeaName, update.getTableName(), true);
                 processName(ddlResult, schmeaName, update.getTableName(), true);
-                ddlResult.setType(DBMSAction.UPDATE);
+                ddlResult.setType(EventType.UPDATE);
                 ddlResults.add(ddlResult);
                 ddlResults.add(ddlResult);
             } else if (statement instanceof SQLDeleteStatement) {
             } else if (statement instanceof SQLDeleteStatement) {
                 DdlResult ddlResult = new DdlResult();
                 DdlResult ddlResult = new DdlResult();
                 SQLDeleteStatement delete = (SQLDeleteStatement) statement;
                 SQLDeleteStatement delete = (SQLDeleteStatement) statement;
                 // 拿到的表名可能为null,比如delete a,b from a where a.id = b.id
                 // 拿到的表名可能为null,比如delete a,b from a where a.id = b.id
                 processName(ddlResult, schmeaName, delete.getTableName(), true);
                 processName(ddlResult, schmeaName, delete.getTableName(), true);
-                ddlResult.setType(DBMSAction.DELETE);
+                ddlResult.setType(EventType.DELETE);
                 ddlResults.add(ddlResult);
                 ddlResults.add(ddlResult);
             }
             }
         }
         }

+ 1 - 99
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/SimpleDdlParser.java → parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/ddl/SimpleDdlParser.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.dbsync;
+package com.alibaba.otter.canal.parse.inbound.mysql.ddl;
 
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.oro.text.regex.Perl5Matcher;
 import org.apache.oro.text.regex.Perl5Matcher;
@@ -218,102 +218,4 @@ public class SimpleDdlParser {
             sql = sb.toString();
             sql = sb.toString();
         }
         }
     }
     }
-
-    public static class DdlResult {
-
-        private String    schemaName;
-        private String    tableName;
-        private String    oriSchemaName;    // rename ddl中的源表
-        private String    oriTableName;     // rename ddl中的目标表
-        private EventType type;
-        private DdlResult renameTableResult; // 多个rename table的存储
-
-        /*
-         * RENAME TABLE tbl_name TO new_tbl_name [, tbl_name2 TO new_tbl_name2]
-         * ...
-         */
-
-        public DdlResult(){
-        }
-
-        public DdlResult(String schemaName){
-            this.schemaName = schemaName;
-        }
-
-        public DdlResult(String schemaName, String tableName){
-            this.schemaName = schemaName;
-            this.tableName = tableName;
-        }
-
-        public DdlResult(String schemaName, String tableName, String oriSchemaName, String oriTableName){
-            this.schemaName = schemaName;
-            this.tableName = tableName;
-            this.oriSchemaName = oriSchemaName;
-            this.oriTableName = oriTableName;
-        }
-
-        public String getSchemaName() {
-            return schemaName;
-        }
-
-        public void setSchemaName(String schemaName) {
-            this.schemaName = schemaName;
-        }
-
-        public String getTableName() {
-            return tableName;
-        }
-
-        public void setTableName(String tableName) {
-            this.tableName = tableName;
-        }
-
-        public EventType getType() {
-            return type;
-        }
-
-        public void setType(EventType type) {
-            this.type = type;
-        }
-
-        public String getOriSchemaName() {
-            return oriSchemaName;
-        }
-
-        public void setOriSchemaName(String oriSchemaName) {
-            this.oriSchemaName = oriSchemaName;
-        }
-
-        public String getOriTableName() {
-            return oriTableName;
-        }
-
-        public void setOriTableName(String oriTableName) {
-            this.oriTableName = oriTableName;
-        }
-
-        public DdlResult getRenameTableResult() {
-            return renameTableResult;
-        }
-
-        public void setRenameTableResult(DdlResult renameTableResult) {
-            this.renameTableResult = renameTableResult;
-        }
-
-        @Override
-        public String toString() {
-            DdlResult ddlResult = this;
-            StringBuffer sb = new StringBuffer();
-            do {
-                sb.append(String.format("DdlResult [schemaName=%s , tableName=%s , oriSchemaName=%s , oriTableName=%s , type=%s ];",
-                    ddlResult.schemaName,
-                    ddlResult.tableName,
-                    ddlResult.oriSchemaName,
-                    ddlResult.oriTableName,
-                    ddlResult.type));
-                ddlResult = ddlResult.renameTableResult;
-            } while (ddlResult != null);
-            return sb.toString();
-        }
-    }
 }
 }

+ 25 - 27
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DataSourceFactoryTSDB.java

@@ -1,43 +1,41 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
 package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
 
 
-import java.sql.SQLException;
-
 import javax.sql.DataSource;
 import javax.sql.DataSource;
 
 
 import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.druid.pool.vendor.MySqlExceptionSorter;
+import com.alibaba.druid.pool.vendor.MySqlValidConnectionChecker;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.math.NumberUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
 /**
- * Created by wanshao
- * Date: 2017/9/22
- * Time: 下午2:46
+ * Created by wanshao Date: 2017/9/22 Time: 下午2:46
  **/
  **/
 public class DataSourceFactoryTSDB {
 public class DataSourceFactoryTSDB {
 
 
-    private static Logger logger = LoggerFactory.getLogger(DataSourceFactoryTSDB.class);
-
-    public static DataSource getDataSource(String address, String userName, String password,
-                                           boolean enable,
-                                           String defaultDatabaseName, String url, String driverClassName) {
-
-        DruidDataSource druidDataSource = new DruidDataSource();
-        druidDataSource.setUrl(url);
-        druidDataSource.setUsername(userName);
-        druidDataSource.setPassword(password);
-
+    public static DataSource getDataSource(String url, String userName, String password) {
         try {
         try {
+            DruidDataSource druidDataSource = new DruidDataSource();
+            druidDataSource.setUrl(url);
+            druidDataSource.setUsername(userName);
+            druidDataSource.setPassword(password);
+            druidDataSource.setTestWhileIdle(true);
+            druidDataSource.setTestOnBorrow(false);
+            druidDataSource.setTestOnReturn(false);
+            druidDataSource.setNotFullTimeoutRetryCount(2);
+            druidDataSource.setValidConnectionCheckerClassName(MySqlValidConnectionChecker.class.getName());
+            druidDataSource.setExceptionSorterClassName(MySqlExceptionSorter.class.getName());
+            druidDataSource.setValidationQuery("SELECT 1");
+            druidDataSource.setInitialSize(1);
+            druidDataSource.setMinIdle(1);
+            druidDataSource.setMaxActive(30);
+            druidDataSource.setMaxWait(10 * 1000);
+            druidDataSource.setTimeBetweenEvictionRunsMillis(60 * 1000);
+            druidDataSource.setMinEvictableIdleTimeMillis(50 * 1000);
+            druidDataSource.setUseUnfairLock(true);
             druidDataSource.init();
             druidDataSource.init();
-        } catch (SQLException e) {
-            logger.error("druidDataSource.init", e);
-            throw new CanalParseException("初始化druid dataSource出错");
+            return druidDataSource;
+        } catch (Throwable e) {
+            throw new CanalParseException("init druidDataSource failed", e);
         }
         }
-        return druidDataSource;
     }
     }
-
-
 }
 }

+ 3 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java

@@ -45,8 +45,8 @@ import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
 public class MemoryTableMeta implements TableMetaTSDB {
 public class MemoryTableMeta implements TableMetaTSDB {
 
 
     private Map<List<String>, TableMeta> tableMetas = new ConcurrentHashMap<List<String>, TableMeta>();
     private Map<List<String>, TableMeta> tableMetas = new ConcurrentHashMap<List<String>, TableMeta>();
-    private SchemaRepository repository = new SchemaRepository(JdbcConstants.MYSQL);
-    private Logger logger = LoggerFactory.getLogger(MemoryTableMeta.class);
+    private SchemaRepository             repository = new SchemaRepository(JdbcConstants.MYSQL);
+    private Logger                       logger     = LoggerFactory.getLogger(MemoryTableMeta.class);
 
 
     public MemoryTableMeta(Logger logger){
     public MemoryTableMeta(Logger logger){
         this.logger = logger;
         this.logger = logger;
@@ -208,7 +208,7 @@ public class MemoryTableMeta implements TableMetaTSDB {
         if (sqlName instanceof SQLPropertyExpr) {
         if (sqlName instanceof SQLPropertyExpr) {
             SQLIdentifierExpr owner = (SQLIdentifierExpr) ((SQLPropertyExpr) sqlName).getOwner();
             SQLIdentifierExpr owner = (SQLIdentifierExpr) ((SQLPropertyExpr) sqlName).getOwner();
             return DruidDdlParser.unescapeName(owner.getName()) + "."
             return DruidDdlParser.unescapeName(owner.getName()) + "."
-                + DruidDdlParser.unescapeName(((SQLPropertyExpr) sqlName).getName());
+                   + DruidDdlParser.unescapeName(((SQLPropertyExpr) sqlName).getName());
         } else if (sqlName instanceof SQLIdentifierExpr) {
         } else if (sqlName instanceof SQLIdentifierExpr) {
             return DruidDdlParser.unescapeName(((SQLIdentifierExpr) sqlName).getName());
             return DruidDdlParser.unescapeName(((SQLIdentifierExpr) sqlName).getName());
         } else {
         } else {

+ 130 - 240
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaManager.java

@@ -1,8 +1,6 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
 package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
 
 
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
@@ -15,58 +13,54 @@ import java.util.regex.Pattern;
 
 
 import javax.annotation.Resource;
 import javax.annotation.Resource;
 
 
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.lang.ObjectUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.alibaba.druid.sql.repository.Schema;
 import com.alibaba.druid.sql.repository.Schema;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.otter.canal.filter.CanalEventFilter;
 import com.alibaba.otter.canal.filter.CanalEventFilter;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.TableMeta;
 import com.alibaba.otter.canal.parse.inbound.TableMeta;
 import com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta;
 import com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
-import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.ProcessJdbcResult;
+import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
 import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DdlResult;
 import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DdlResult;
 import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DruidDdlParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DruidDdlParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDAO;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDAO;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaHistoryDO;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaHistoryDO;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaSnapshotDO;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaSnapshotDO;
-
 import com.taobao.tddl.dbsync.binlog.BinlogPosition;
 import com.taobao.tddl.dbsync.binlog.BinlogPosition;
-import org.apache.commons.beanutils.BeanUtils;
-import org.apache.commons.lang.ObjectUtils;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
 /**
- * 基于console远程管理
- *
- * see internal class: CanalTableMeta , ConsoleTableMetaTSDB
+ * 基于console远程管理 see internal class: CanalTableMeta , ConsoleTableMetaTSDB
  *
  *
  * @author agapple 2017年7月27日 下午10:47:55
  * @author agapple 2017年7月27日 下午10:47:55
  * @since 3.2.5
  * @since 3.2.5
  */
  */
 public class TableMetaManager implements TableMetaTSDB {
 public class TableMetaManager implements TableMetaTSDB {
-    private static Pattern pattern = Pattern.compile("Duplicate entry '.*' for key '*'");
 
 
+    private static Logger               logger        = LoggerFactory.getLogger(TableMetaManager.class);
+    private static Pattern              pattern       = Pattern.compile("Duplicate entry '.*' for key '*'");
     private static final BinlogPosition INIT_POSITION = BinlogPosition.parseFromString("0:0#-2.-1");
     private static final BinlogPosition INIT_POSITION = BinlogPosition.parseFromString("0:0#-2.-1");
-    private Logger logger = LoggerFactory.getLogger(TableMetaManager.class);
-    private String consoleDomain = null;
-    private int retry = 3;
-    private MemoryTableMeta memoryTableMeta;
-    private MysqlConnection connection; // 查询meta信息的链接
-    private CanalEventFilter filter;
-    private BinlogPosition lastPosition;
-    private ScheduledExecutorService scheduler;
+    private MemoryTableMeta             memoryTableMeta;
+    private MysqlConnection             connection;                                                         // 查询meta信息的链接
+    private CanalEventFilter            filter;
+    private BinlogPosition              lastPosition;
+    private ScheduledExecutorService    scheduler;
 
 
     @Resource
     @Resource
-    private MetaHistoryDAO metaHistoryDAO;
+    private MetaHistoryDAO              metaHistoryDAO;
 
 
     @Resource
     @Resource
-    private MetaSnapshotDAO metaSnapshotDAO;
+    private MetaSnapshotDAO             metaSnapshotDAO;
 
 
-    public void init(){
+    public void init() {
         this.memoryTableMeta = new MemoryTableMeta(logger);
         this.memoryTableMeta = new MemoryTableMeta(logger);
         this.scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
         this.scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
 
 
@@ -91,7 +85,7 @@ public class TableMetaManager implements TableMetaTSDB {
         }, 24, 24, TimeUnit.SECONDS);
         }, 24, 24, TimeUnit.SECONDS);
     }
     }
 
 
-    public TableMetaManager() {
+    public TableMetaManager(){
 
 
     }
     }
 
 
@@ -147,80 +141,46 @@ public class TableMetaManager implements TableMetaTSDB {
      * 初始化的时候dump一下表结构
      * 初始化的时候dump一下表结构
      */
      */
     private boolean dumpTableMeta(MysqlConnection connection, final CanalEventFilter filter) {
     private boolean dumpTableMeta(MysqlConnection connection, final CanalEventFilter filter) {
-        List<String> schemas = connection.query("show databases", new ProcessJdbcResult<List>() {
-
-            @Override
-            public List process(ResultSet rs) throws SQLException {
-                List<String> schemas = new ArrayList<String>();
-                while (rs.next()) {
-                    String schema = rs.getString(1);
-                    if (!filter.filter(schema)) {
-                        schemas.add(schema);
-                    }
+        try {
+            ResultSetPacket packet = connection.query("show databases");
+            List<String> schemas = new ArrayList<String>();
+            for (String schema : packet.getFieldValues()) {
+                if (!filter.filter(schema)) {
+                    schemas.add(schema);
                 }
                 }
-                return schemas;
             }
             }
-        });
 
 
-        for (String schema : schemas) {
-            List<String> tables = connection.query("show tables from `" + schema + "`", new ProcessJdbcResult<List>() {
-
-                @Override
-                public List process(ResultSet rs) throws SQLException {
-                    List<String> tables = new ArrayList<String>();
-                    while (rs.next()) {
-                        String table = rs.getString(1);
-                        if (!filter.filter(table)) {
-                            tables.add(table);
-                        }
+            for (String schema : schemas) {
+                packet = connection.query("show tables from `" + schema + "`");
+                List<String> tables = new ArrayList<String>();
+                for (String table : packet.getFieldValues()) {
+                    if (!filter.filter(table)) {
+                        tables.add(table);
                     }
                     }
-                    return tables;
                 }
                 }
-            });
-
-            StringBuilder sql = new StringBuilder();
-            for (String table : tables) {
-                sql.append("show create table `" + schema + "`.`" + table + "`;");
-            }
 
 
-            // 使用多语句方式读取
-            Statement stmt = null;
-            try {
-                stmt = connection.getConn().createStatement();
-                ResultSet rs = stmt.executeQuery(sql.toString());
-                boolean existMoreResult = false;
-                do {
-                    if (existMoreResult) {
-                        rs = stmt.getResultSet();
-                    }
+                StringBuilder sql = new StringBuilder();
+                for (String table : tables) {
+                    sql.append("show create table `" + schema + "`.`" + table + "`;");
+                }
 
 
-                    while (rs.next()) {
-                        String oneTableCreateSql = rs.getString(2);
+                List<ResultSetPacket> packets = connection.queryMulti(sql.toString());
+                for (ResultSetPacket onePacket : packets) {
+                    if (onePacket.getFieldValues().size() > 1) {
+                        String oneTableCreateSql = onePacket.getFieldValues().get(1);
                         memoryTableMeta.apply(INIT_POSITION, schema, oneTableCreateSql);
                         memoryTableMeta.apply(INIT_POSITION, schema, oneTableCreateSql);
                     }
                     }
-
-                    existMoreResult = stmt.getMoreResults();
-                } while (existMoreResult);
-            } catch (SQLException e) {
-                throw new CanalParseException(e);
-            } finally {
-                if (stmt != null) {
-                    try {
-                        stmt.close();
-                    } catch (SQLException e) {
-                        // ignore
-                    }
                 }
                 }
             }
             }
-        }
 
 
-        return true;
+            return true;
+        } catch (IOException e) {
+            throw new CanalParseException(e);
+        }
     }
     }
 
 
     private boolean applyHistoryToDB(BinlogPosition position, String schema, String ddl) {
     private boolean applyHistoryToDB(BinlogPosition position, String schema, String ddl) {
-
         Map<String, String> content = new HashMap<String, String>();
         Map<String, String> content = new HashMap<String, String>();
-
         content.put("binlogFile", position.getFileName());
         content.put("binlogFile", position.getFileName());
         content.put("binlogOffest", String.valueOf(position.getPosition()));
         content.put("binlogOffest", String.valueOf(position.getPosition()));
         content.put("binlogMasterId", String.valueOf(position.getMasterId()));
         content.put("binlogMasterId", String.valueOf(position.getMasterId()));
@@ -240,26 +200,23 @@ public class TableMetaManager implements TableMetaTSDB {
             // content.put("extra", "");
             // content.put("extra", "");
         }
         }
 
 
-        for (int i = 0; i < retry; i++) {
-            MetaHistoryDO metaDO = new MetaHistoryDO();
-            try {
-                BeanUtils.populate(metaDO, content);
-                // 会建立唯一约束,解决:
-                // 1. 重复的binlog file+offest
-                // 2. 重复的masterId+timestamp
-                metaHistoryDAO.insert(metaDO);
-            } catch (Throwable e) {
-                if (isUkDuplicateException(e)) {
-                    // 忽略掉重复的位点
-                    logger.warn("dup apply for sql : " + ddl);
-                } else {
-                    throw new RuntimeException("apply history to db failed caused by : " + e.getMessage());
-                }
-
+        MetaHistoryDO metaDO = new MetaHistoryDO();
+        try {
+            BeanUtils.populate(metaDO, content);
+            // 会建立唯一约束,解决:
+            // 1. 重复的binlog file+offest
+            // 2. 重复的masterId+timestamp
+            metaHistoryDAO.insert(metaDO);
+        } catch (Throwable e) {
+            if (isUkDuplicateException(e)) {
+                // 忽略掉重复的位点
+                logger.warn("dup apply for sql : " + ddl);
+            } else {
+                throw new CanalParseException("apply history to db failed caused by : " + e.getMessage());
             }
             }
-            return true;
+
         }
         }
-        return false;
+        return true;
     }
     }
 
 
     /**
     /**
@@ -290,9 +247,7 @@ public class TableMetaManager implements TableMetaTSDB {
             }
             }
         }
         }
         if (compareAll) {
         if (compareAll) {
-
             Map<String, String> content = new HashMap<String, String>();
             Map<String, String> content = new HashMap<String, String>();
-
             content.put("binlogFile", position.getFileName());
             content.put("binlogFile", position.getFileName());
             content.put("binlogOffest", String.valueOf(position.getPosition()));
             content.put("binlogOffest", String.valueOf(position.getPosition()));
             content.put("binlogMasterId", String.valueOf(position.getMasterId()));
             content.put("binlogMasterId", String.valueOf(position.getMasterId()));
@@ -302,24 +257,19 @@ public class TableMetaManager implements TableMetaTSDB {
                 throw new RuntimeException("apply failed caused by content is empty in applySnapshotToDB");
                 throw new RuntimeException("apply failed caused by content is empty in applySnapshotToDB");
             }
             }
 
 
-            for (int i = 0; i < retry; i++) {
-                MetaSnapshotDO snapshotDO = new MetaSnapshotDO();
-                try {
-                    BeanUtils.populate(snapshotDO, content);
-                    metaSnapshotDAO.insert(snapshotDO);
-                } catch (Throwable e) {
-                    if (isUkDuplicateException(e)) {
-                        // 忽略掉重复的位点
-                        logger.warn("dup apply snapshot for data : " + snapshotDO.getData());
-                    } else {
-                        throw new RuntimeException("apply failed caused by : " + e.getMessage());
-                    }
+            MetaSnapshotDO snapshotDO = new MetaSnapshotDO();
+            try {
+                BeanUtils.populate(snapshotDO, content);
+                metaSnapshotDAO.insert(snapshotDO);
+            } catch (Throwable e) {
+                if (isUkDuplicateException(e)) {
+                    // 忽略掉重复的位点
+                    logger.warn("dup apply snapshot for data : " + snapshotDO.getData());
+                } else {
+                    throw new CanalParseException("apply failed caused by : " + e.getMessage());
                 }
                 }
-                return true;
-
             }
             }
-            return false;
-
+            return true;
         } else {
         } else {
             logger.error("compare failed , check log");
             logger.error("compare failed , check log");
         }
         }
@@ -327,26 +277,15 @@ public class TableMetaManager implements TableMetaTSDB {
     }
     }
 
 
     private boolean compareTableMetaDbAndMemory(MysqlConnection connection, final String schema, final String table) {
     private boolean compareTableMetaDbAndMemory(MysqlConnection connection, final String schema, final String table) {
-        TableMeta tableMetaFromDB = connection.query("desc " + getFullName(schema, table),
-            new ProcessJdbcResult<TableMeta>() {
-
-                @Override
-                public TableMeta process(ResultSet rs) throws SQLException {
-                    List<FieldMeta> metas = new ArrayList<FieldMeta>();
-                    while (rs.next()) {
-                        FieldMeta meta = new FieldMeta();
-                        // 做一个优化,使用String.intern(),共享String对象,减少内存使用
-                        meta.setColumnName(rs.getString("Field"));
-                        meta.setColumnType(rs.getString("Type"));
-                        meta.setNullable(StringUtils.equalsIgnoreCase(rs.getString("Null"), "YES"));
-                        meta.setKey("PRI".equalsIgnoreCase(rs.getString("Key")));
-                        meta.setDefaultValue(rs.getString("Default"));
-                        metas.add(meta);
-                    }
-
-                    return new TableMeta(schema, table, metas);
-                }
-            });
+        TableMeta tableMetaFromDB = new TableMeta();
+        tableMetaFromDB.setSchema(schema);
+        tableMetaFromDB.setTable(table);
+        try {
+            ResultSetPacket packet = connection.query("desc " + getFullName(schema, table));
+            tableMetaFromDB.setFields(TableMetaCache.parserTableMeta(packet));
+        } catch (IOException e) {
+            throw new CanalParseException(e);
+        }
 
 
         TableMeta tableMetaFromMem = memoryTableMeta.find(schema, table);
         TableMeta tableMetaFromMem = memoryTableMeta.find(schema, table);
         boolean result = compareTableMeta(tableMetaFromMem, tableMetaFromDB);
         boolean result = compareTableMeta(tableMetaFromMem, tableMetaFromDB);
@@ -358,119 +297,70 @@ public class TableMetaManager implements TableMetaTSDB {
     }
     }
 
 
     private BinlogPosition buildMemFromSnapshot(BinlogPosition position) {
     private BinlogPosition buildMemFromSnapshot(BinlogPosition position) {
-
-        Map<String, String> content = new HashMap<String, String>();
-
-        content.put("binlogFile", position.getFileName());
-        content.put("binlogOffest", String.valueOf(position.getPosition()));
-        content.put("binlogMasterId", String.valueOf(position.getMasterId()));
-        content.put("binlogTimestamp", String.valueOf(position.getTimestamp()));
-        if (content.isEmpty()) {
-            throw new RuntimeException("apply failed caused by content is empty in buildMemFromSnapshot");
-        }
-        for (int i = 0; i < retry; i++) {
-
-            try {
-
-                String timestamp = content.get("binlogTimestamp");
-                MetaSnapshotDO snapshotDO = metaSnapshotDAO.findByTimestamp(Long.valueOf(timestamp));
-                JSONObject jsonData = new JSONObject();
-                jsonData.put("content", JSON.toJSONString(snapshotDO));
-                if (jsonData == null) {
-                    // 可能没有任何snapshot数据
+        try {
+            MetaSnapshotDO snapshotDO = metaSnapshotDAO.findByTimestamp(position.getTimestamp());
+            String binlogFile = snapshotDO.getBinlogFile();
+            Long binlogOffest = snapshotDO.getBinlogOffest();
+            String binlogMasterId = snapshotDO.getBinlogMasterId();
+            Long binlogTimestamp = snapshotDO.getBinlogTimestamp();
+
+            BinlogPosition snapshotPosition = new BinlogPosition(binlogFile,
+                binlogOffest == null ? 0l : binlogOffest,
+                Long.valueOf(binlogMasterId == null ? "-2" : binlogMasterId),
+                binlogTimestamp == null ? 0l : binlogTimestamp);
+            // data存储为Map<String,String>,每个分库一套建表
+            String sqlData = snapshotDO.getData();
+            JSONObject jsonObj = JSON.parseObject(sqlData);
+            for (Map.Entry entry : jsonObj.entrySet()) {
+                // 记录到内存
+                if (!memoryTableMeta.apply(snapshotPosition,
+                    ObjectUtils.toString(entry.getKey()),
+                    ObjectUtils.toString(entry.getValue()))) {
                     return null;
                     return null;
                 }
                 }
-
-                String binlogFile = jsonData.getString("binlogFile");
-                String binlogOffest = jsonData.getString("binlogOffest");
-                String binlogMasterId = jsonData.getString("binlogMasterId");
-                String binlogTimestamp = jsonData.getString("binlogTimestamp");
-
-                BinlogPosition snapshotPosition = new BinlogPosition(binlogFile,
-                    Long.valueOf(binlogOffest == null ? "0" : binlogOffest),
-                    Long.valueOf(binlogMasterId == null ? "-2" : binlogMasterId),
-                    Long.valueOf(binlogTimestamp == null ? "0" : binlogTimestamp));
-                // data存储为Map<String,String>,每个分库一套建表
-                String sqlData = jsonData.getString("data");
-                JSONObject jsonObj = JSON.parseObject(sqlData);
-                for (Map.Entry entry : jsonObj.entrySet()) {
-                    // 记录到内存
-                    if (!memoryTableMeta.apply(snapshotPosition,
-                        ObjectUtils.toString(entry.getKey()),
-                        ObjectUtils.toString(entry.getValue()))) {
-                        return null;
-                    }
-                }
-
-                return snapshotPosition;
-
-            } catch (Throwable e) {
-                throw new RuntimeException("apply failed caused by : " + e.getMessage());
             }
             }
 
 
+            return snapshotPosition;
+        } catch (Throwable e) {
+            throw new CanalParseException("apply failed caused by : " + e.getMessage());
         }
         }
-
-        return null;
-
     }
     }
 
 
     private boolean applyHistoryOnMemory(BinlogPosition position, BinlogPosition rollbackPosition) {
     private boolean applyHistoryOnMemory(BinlogPosition position, BinlogPosition rollbackPosition) {
-        Map<String, String> content = new HashMap<String, String>();
-        content.put("binlogSnapshotTimestamp", String.valueOf(position.getTimestamp()));
-        content.put("binlogFile", rollbackPosition.getFileName());
-        content.put("binlogOffest", String.valueOf(rollbackPosition.getPosition()));
-        content.put("binlogMasterId", String.valueOf(rollbackPosition.getMasterId()));
-        content.put("binlogTimestamp", String.valueOf(rollbackPosition.getTimestamp()));
-        String timestamp = content.get("binlogTimestamp");
-        String binlogSnapshotTimestamp = content.get("binlogSnapshotTimestamp");
-
-        for (int i = 0; i < retry; i++) {
-            try {
-                List<MetaHistoryDO> metaHistoryDOList = metaHistoryDAO.findByTimestamp(
-                    Long.valueOf(binlogSnapshotTimestamp),
-                    Long.valueOf(timestamp));
-                JSONObject json = new JSONObject();
-                json.put("content", JSON.toJSONString(metaHistoryDOList));
-                String data = ObjectUtils.toString(json.get("content"));
-                JSONArray jsonArray = JSON.parseArray(data);
-                for (Object jsonObj : jsonArray) {
-                    JSONObject jsonData = (JSONObject)jsonObj;
-                    String binlogFile = jsonData.getString("binlogFile");
-                    String binlogOffest = jsonData.getString("binlogOffest");
-                    String binlogMasterId = jsonData.getString("binlogMasterId");
-                    String binlogTimestamp = jsonData.getString("binlogTimestamp");
-                    String useSchema = jsonData.getString("useSchema");
-                    String sqlData = jsonData.getString("sql");
-                    BinlogPosition snapshotPosition = new BinlogPosition(binlogFile,
-                        Long.valueOf(binlogOffest == null ? "0" : binlogOffest),
-                        Long.valueOf(binlogMasterId == null ? "-2" : binlogMasterId),
-                        Long.valueOf(binlogTimestamp == null ? "0" : binlogTimestamp));
-
-                    // 如果是同一秒内,对比一下history的位点,如果比期望的位点要大,忽略之
-                    if (snapshotPosition.getTimestamp() > rollbackPosition.getTimestamp()) {
-                        continue;
-                    } else if (rollbackPosition.getMasterId() == snapshotPosition.getMasterId()
-                        && snapshotPosition.compareTo(rollbackPosition) > 0) {
-                        continue;
-                    }
-
-                    // 记录到内存
-                    if (!memoryTableMeta.apply(snapshotPosition, useSchema, sqlData)) {
-                        return false;
-                    }
-
+        try {
+            List<MetaHistoryDO> metaHistoryDOList = metaHistoryDAO.findByTimestamp(position.getTimestamp(),
+                rollbackPosition.getTimestamp());
+            for (MetaHistoryDO metaHistoryDO : metaHistoryDOList) {
+                String binlogFile = metaHistoryDO.getBinlogFile();
+                Long binlogOffest = metaHistoryDO.getBinlogOffest();
+                String binlogMasterId = metaHistoryDO.getBinlogMasterId();
+                Long binlogTimestamp = metaHistoryDO.getBinlogTimestamp();
+                String useSchema = metaHistoryDO.getUseSchema();
+                String sqlData = metaHistoryDO.getSql();
+                BinlogPosition snapshotPosition = new BinlogPosition(binlogFile,
+                    binlogOffest == null ? 0L : binlogOffest,
+                    Long.valueOf(binlogMasterId == null ? "-2" : binlogMasterId),
+                    binlogTimestamp == null ? 0L : binlogTimestamp);
+
+                // 如果是同一秒内,对比一下history的位点,如果比期望的位点要大,忽略之
+                if (snapshotPosition.getTimestamp() > rollbackPosition.getTimestamp()) {
+                    continue;
+                } else if (rollbackPosition.getMasterId() == snapshotPosition.getMasterId()
+                           && snapshotPosition.compareTo(rollbackPosition) > 0) {
+                    continue;
                 }
                 }
 
 
-                return jsonArray.size() > 0;
-            } catch (Throwable e) {
-
-                throw new RuntimeException("apply failed", e);
+                // 记录到内存
+                if (!memoryTableMeta.apply(snapshotPosition, useSchema, sqlData)) {
+                    return false;
+                }
 
 
             }
             }
 
 
+            return metaHistoryDOList.size() > 0;
+        } catch (Throwable e) {
+            throw new CanalParseException("apply failed", e);
         }
         }
-
-        return false;
     }
     }
 
 
     private String getFullName(String schema, String table) {
     private String getFullName(String schema, String table) {

+ 4 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaHistoryDAO.java

@@ -3,10 +3,10 @@ package com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 
 
-import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaHistoryDO;
+import org.springframework.orm.ibatis.support.SqlMapClientDaoSupport;
 
 
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaHistoryDO;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Maps;
-import org.springframework.orm.ibatis.support.SqlMapClientDaoSupport;
 
 
 /**
 /**
  * canal数据的存储
  * canal数据的存储
@@ -14,7 +14,6 @@ import org.springframework.orm.ibatis.support.SqlMapClientDaoSupport;
  * @author wanshao 2017年7月27日 下午10:51:55
  * @author wanshao 2017年7月27日 下午10:51:55
  * @since 3.2.5
  * @since 3.2.5
  */
  */
-@SuppressWarnings("deprecation")
 public class MetaHistoryDAO extends SqlMapClientDaoSupport {
 public class MetaHistoryDAO extends SqlMapClientDaoSupport {
 
 
     public List<MetaHistoryDO> getAll() {
     public List<MetaHistoryDO> getAll() {
@@ -22,14 +21,14 @@ public class MetaHistoryDAO extends SqlMapClientDaoSupport {
     }
     }
 
 
     public Long insert(MetaHistoryDO metaDO) {
     public Long insert(MetaHistoryDO metaDO) {
-        return (Long)getSqlMapClientTemplate().insert("table_meta_history.insert", metaDO);
+        return (Long) getSqlMapClientTemplate().insert("table_meta_history.insert", metaDO);
     }
     }
 
 
     public List<MetaHistoryDO> findByTimestamp(long snapshotTimestamp, long timestamp) {
     public List<MetaHistoryDO> findByTimestamp(long snapshotTimestamp, long timestamp) {
         HashMap params = Maps.newHashMapWithExpectedSize(2);
         HashMap params = Maps.newHashMapWithExpectedSize(2);
         params.put("snapshotTimestamp", snapshotTimestamp);
         params.put("snapshotTimestamp", snapshotTimestamp);
         params.put("timestamp", timestamp);
         params.put("timestamp", timestamp);
-        return (List<MetaHistoryDO>)getSqlMapClientTemplate().queryForList("table_meta_history.findByTimestamp",
+        return (List<MetaHistoryDO>) getSqlMapClientTemplate().queryForList("table_meta_history.findByTimestamp",
             params);
             params);
     }
     }
 
 

+ 4 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaSnapshotDAO.java

@@ -2,10 +2,10 @@ package com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao;
 
 
 import java.util.HashMap;
 import java.util.HashMap;
 
 
-import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaSnapshotDO;
+import org.springframework.orm.ibatis.support.SqlMapClientDaoSupport;
 
 
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaSnapshotDO;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Maps;
-import org.springframework.orm.ibatis.support.SqlMapClientDaoSupport;
 
 
 /**
 /**
  * canal数据的存储
  * canal数据的存储
@@ -15,6 +15,7 @@ import org.springframework.orm.ibatis.support.SqlMapClientDaoSupport;
  */
  */
 
 
 public class MetaSnapshotDAO extends SqlMapClientDaoSupport {
 public class MetaSnapshotDAO extends SqlMapClientDaoSupport {
+
     public Long insert(MetaSnapshotDO snapshotDO) {
     public Long insert(MetaSnapshotDO snapshotDO) {
         return (Long) getSqlMapClientTemplate().insert("table_meta_snapshot.insert", snapshotDO);
         return (Long) getSqlMapClientTemplate().insert("table_meta_snapshot.insert", snapshotDO);
     }
     }
@@ -26,8 +27,7 @@ public class MetaSnapshotDAO extends SqlMapClientDaoSupport {
     public MetaSnapshotDO findByTimestamp(long timestamp) {
     public MetaSnapshotDO findByTimestamp(long timestamp) {
         HashMap params = Maps.newHashMapWithExpectedSize(2);
         HashMap params = Maps.newHashMapWithExpectedSize(2);
         params.put("timestamp", timestamp);
         params.put("timestamp", timestamp);
-        return (MetaSnapshotDO) getSqlMapClientTemplate().queryForObject("table_meta_snapshot.findByTimestamp",
-            params);
+        return (MetaSnapshotDO) getSqlMapClientTemplate().queryForObject("table_meta_snapshot.findByTimestamp", params);
     }
     }
 
 
     public Integer deleteByTask(String taskName) {
     public Integer deleteByTask(String taskName) {

+ 5 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/model/MetaHistoryDO.java

@@ -11,22 +11,22 @@ public class MetaHistoryDO {
     /**
     /**
      * 主键
      * 主键
      */
      */
-    private Long id;
+    private Long   id;
 
 
     /**
     /**
      * 创建时间
      * 创建时间
      */
      */
-    private Date gmtCreate;
+    private Date   gmtCreate;
 
 
     /**
     /**
      * 修改时间
      * 修改时间
      */
      */
-    private Date gmtModified;
+    private Date   gmtModified;
 
 
     private String binlogFile;
     private String binlogFile;
-    private Long binlogOffest;
+    private Long   binlogOffest;
     private String binlogMasterId;
     private String binlogMasterId;
-    private Long binlogTimestamp;
+    private Long   binlogTimestamp;
     private String useSchema;
     private String useSchema;
     private String schema;
     private String schema;
     private String table;
     private String table;

+ 5 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/model/MetaSnapshotDO.java

@@ -11,22 +11,22 @@ public class MetaSnapshotDO {
     /**
     /**
      * 主键
      * 主键
      */
      */
-    private Long id;
+    private Long   id;
 
 
     /**
     /**
      * 创建时间
      * 创建时间
      */
      */
-    private Date gmtCreate;
+    private Date   gmtCreate;
 
 
     /**
     /**
      * 修改时间
      * 修改时间
      */
      */
-    private Date gmtModified;
+    private Date   gmtModified;
 
 
     private String binlogFile;
     private String binlogFile;
-    private Long binlogOffest;
+    private Long   binlogOffest;
     private String binlogMasterId;
     private String binlogMasterId;
-    private Long binlogTimestamp;
+    private Long   binlogTimestamp;
     private String data;
     private String data;
     private String extra;
     private String extra;
 
 

+ 1 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java

@@ -45,6 +45,7 @@ public class FileMixedLogPositionManager extends AbstractLogPositionManager {
 
 
     private ScheduledExecutorService executorService;
     private ScheduledExecutorService executorService;
 
 
+    @SuppressWarnings("serial")
     private final LogPosition        nullPosition = new LogPosition() {
     private final LogPosition        nullPosition = new LogPosition() {
                                                   };
                                                   };
 
 

+ 1 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java

@@ -29,6 +29,7 @@ public class PeriodMixedLogPositionManager extends AbstractLogPositionManager {
     private long                        period;
     private long                        period;
     private Set<String>                 persistTasks;
     private Set<String>                 persistTasks;
 
 
+    @SuppressWarnings("serial")
     private final LogPosition           nullPosition = new LogPosition() {
     private final LogPosition           nullPosition = new LogPosition() {
                                                      };
                                                      };
 
 

+ 31 - 22
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/TableMetaCacheTest.java

@@ -2,34 +2,43 @@ package com.alibaba.otter.canal.parse.inbound;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
+import java.util.List;
 
 
-import com.taobao.tddl.dbsync.binlog.BinlogPosition;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 
 
-import com.alibaba.druid.sql.visitor.functions.Bin;
-import com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
-import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
 
 
 public class TableMetaCacheTest {
 public class TableMetaCacheTest {
 
 
-    //@Test
-    //public void testSimple() {
-    //
-    //    MysqlConnection connection = new MysqlConnection(new InetSocketAddress("127.0.0.1", 3306), "xxxxx", "xxxxx");
-    //    try {
-    //        connection.connect();
-    //    } catch (IOException e) {
-    //        Assert.fail(e.getMessage());
-    //    }
-    //
-    //    TableMetaCache cache = new TableMetaCache(connection);
-    //    TableMeta meta = cache.getTableMeta("otter1", "otter_stability1");
-    //    Assert.assertNotNull(meta);
-    //    for (FieldMeta field : meta.getFields()) {
-    //        System.out.println("filed :" + field.getColumnName() + " , isKey : " + field.isKey() + " , isNull : "
-    //                           + field.isNullable());
-    //    }
-    //}
+    @Test
+    public void testSimple() throws IOException {
+
+        MysqlConnection connection = new MysqlConnection(new InetSocketAddress("100.81.153.8", 3308),
+            "jingwei",
+            "jingwei");
+        try {
+            connection.connect();
+        } catch (IOException e) {
+            Assert.fail(e.getMessage());
+        }
+
+        List<ResultSetPacket> packets = connection.queryMulti("show create table test.ljh_test; show create table test.user");
+        String createDDL = null;
+        if (packets.get(0).getFieldValues().size() > 0) {
+            createDDL = packets.get(0).getFieldValues().get(1);
+        }
+
+        System.out.println(createDDL);
+
+        // TableMetaCache cache = new TableMetaCache(connection);
+        // TableMeta meta = cache.getTableMeta("otter1", "otter_stability1");
+        // Assert.assertNotNull(meta);
+        // for (FieldMeta field : meta.getFields()) {
+        // System.out.println("filed :" + field.getColumnName() + " , isKey : "
+        // + field.isKey() + " , isNull : "
+        // + field.isNullable());
+        // }
+    }
 }
 }

+ 0 - 1
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/group/GroupEventPaserTest.java

@@ -88,7 +88,6 @@ public class GroupEventPaserTest {
     private BinlogParser buildParser(AuthenticationInfo info) {
     private BinlogParser buildParser(AuthenticationInfo info) {
         return new AbstractBinlogParser<LogEvent>() {
         return new AbstractBinlogParser<LogEvent>() {
 
 
-
             @Override
             @Override
             public Entry parse(LogEvent event, boolean isSeek) throws CanalParseException {
             public Entry parse(LogEvent event, boolean isSeek) throws CanalParseException {
                 return null;
                 return null;

+ 2 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/SimpleDdlParserTest.java

@@ -3,8 +3,8 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 
 
-import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.SimpleDdlParser;
-import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.SimpleDdlParser.DdlResult;
+import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DdlResult;
+import com.alibaba.otter.canal.parse.inbound.mysql.ddl.SimpleDdlParser;
 import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
 import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
 
 
 public class SimpleDdlParserTest {
 public class SimpleDdlParserTest {

+ 2 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMetaTest.java

@@ -19,7 +19,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  * @author agapple 2017年8月1日 下午7:15:54
  * @author agapple 2017年8月1日 下午7:15:54
  */
  */
 @RunWith(SpringJUnit4ClassRunner.class)
 @RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(locations = {"/dal-dao.xml"})
+@ContextConfiguration(locations = { "/dal-dao.xml" })
 public class MemoryTableMetaTest {
 public class MemoryTableMetaTest {
 
 
     @Test
     @Test
@@ -33,6 +33,6 @@ public class MemoryTableMetaTest {
 
 
         TableMeta meta = memoryTableMeta.find("test", "test");
         TableMeta meta = memoryTableMeta.find("test", "test");
         System.out.println(meta);
         System.out.println(meta);
-        Assert.assertTrue( meta.getFieldMetaByName("ID").isKey());
+        Assert.assertTrue(meta.getFieldMetaByName("ID").isKey());
     }
     }
 }
 }

+ 3 - 6
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MetaHistoryDAOTest.java

@@ -13,12 +13,10 @@ import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
 
 /**
 /**
- * Created by wanshao
- * Date: 2017/9/20
- * Time: 下午5:00
+ * Created by wanshao Date: 2017/9/20 Time: 下午5:00
  **/
  **/
 @RunWith(SpringJUnit4ClassRunner.class)
 @RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(locations = {"/dal-dao.xml"})
+@ContextConfiguration(locations = { "/dal-dao.xml" })
 public class MetaHistoryDAOTest {
 public class MetaHistoryDAOTest {
 
 
     @Resource
     @Resource
@@ -26,8 +24,7 @@ public class MetaHistoryDAOTest {
 
 
     @Test
     @Test
     public void testGetAll() {
     public void testGetAll() {
-        List<MetaHistoryDO>
-            metaHistoryDOList = metaHistoryDAO.getAll();
+        List<MetaHistoryDO> metaHistoryDOList = metaHistoryDAO.getAll();
         for (MetaHistoryDO metaHistoryDO : metaHistoryDOList) {
         for (MetaHistoryDO metaHistoryDO : metaHistoryDOList) {
             System.out.println(metaHistoryDO.getId());
             System.out.println(metaHistoryDO.getId());
         }
         }

+ 0 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaManagerTest.java

@@ -30,8 +30,6 @@ public class TableMetaManagerTest {
     @Test
     @Test
     public void testSimple() throws FileNotFoundException, IOException {
     public void testSimple() throws FileNotFoundException, IOException {
 
 
-
-
         URL url = Thread.currentThread().getContextClassLoader().getResource("dummy.txt");
         URL url = Thread.currentThread().getContextClassLoader().getResource("dummy.txt");
         File dummyFile = new File(url.getFile());
         File dummyFile = new File(url.getFile());
         File create = new File(dummyFile.getParent() + "/ddl", "create.sql");
         File create = new File(dummyFile.getParent() + "/ddl", "create.sql");

+ 0 - 46
parse/src/test/resources/dal-dao.xml

@@ -1,46 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-       xmlns="http://www.springframework.org/schema/beans" xmlns:tx="http://www.springframework.org/schema/tx"
-       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
-	http://www.springframework.org/schema/tx
-    http://www.springframework.org/schema/tx/spring-tx-2.0.xsd"
-       default-autowire="byName">
-    <tx:annotation-driven/>
-
-    <bean id="dataSource" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DataSourceFactoryTSDB"
-          factory-method="getDataSource">
-        <constructor-arg index="0" value="127.0.0.1"/>
-        <constructor-arg index="1" value="canal"/>
-        <constructor-arg index="2" value="canal"/>
-        <constructor-arg index="3" value="true"/>
-        <constructor-arg index="4" value="tsdb"/>
-        <constructor-arg index="5" value="jdbc:mysql://127.0.0.1:3306/tsdb"/>
-        <constructor-arg index="6" value="com.mysql.jdbc.Driver"/>
-    </bean>
-
-    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
-        <property name="dataSource" ref="dataSource"/>
-    </bean>
-
-    <bean id="txTemplate" class="org.springframework.transaction.support.TransactionTemplate">
-        <property name="transactionManager" ref="transactionManager"></property>
-        <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"></property>
-    </bean>
-
-    <bean id="sqlMapClient" class="org.springframework.orm.ibatis.SqlMapClientFactoryBean">
-        <property name="dataSource" ref="dataSource"/>
-        <property name="configLocation" value="classpath:sqlmap-config.xml"/>
-    </bean>
-
-
-    <bean id="MetaHistoryDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDAO">
-        <property name="sqlMapClient" ref="sqlMapClient"/>
-    </bean>
-
-    <bean id="MetaSnapshotDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO">
-        <property name="sqlMapClient" ref="sqlMapClient"/>
-    </bean>
-
-    <bean id="PersistTableMeta" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager"></bean>
-
-</beans>

+ 3 - 6
parse/src/main/resources/dal-dao.xml → parse/src/test/resources/tsdb/dal-dao.xml

@@ -9,13 +9,10 @@
 
 
     <bean id="dataSource" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DataSourceFactoryTSDB"
     <bean id="dataSource" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DataSourceFactoryTSDB"
           factory-method="getDataSource">
           factory-method="getDataSource">
-        <constructor-arg index="0" value="${canal.instance.tsdb.address}"/>
+        <constructor-arg index="0" value="${canal.instance.tsdb.url}"/>
         <constructor-arg index="1" value="${canal.instance.tsdb.dbUsername}"/>
         <constructor-arg index="1" value="${canal.instance.tsdb.dbUsername}"/>
         <constructor-arg index="2" value="${canal.instance.tsdb.dbPassword}"/>
         <constructor-arg index="2" value="${canal.instance.tsdb.dbPassword}"/>
         <constructor-arg index="3" value="${canal.instance.tsdb.enable:true}"/>
         <constructor-arg index="3" value="${canal.instance.tsdb.enable:true}"/>
-        <constructor-arg index="4" value="${canal.instance.tsdb.defaultDatabaseName}"/>
-        <constructor-arg index="5" value="${canal.instance.tsdb.url}"/>
-        <constructor-arg index="6" value="${canal.instance.tsdb.driverClassName}"/>
     </bean>
     </bean>
 
 
     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
     <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
@@ -32,11 +29,11 @@
         <property name="configLocation" value="classpath:sqlmap-config.xml"/>
         <property name="configLocation" value="classpath:sqlmap-config.xml"/>
     </bean>
     </bean>
 
 
-    <bean id="MetaHistoryDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDAO">
+    <bean id="metaHistoryDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDAO">
         <property name="sqlMapClient" ref="sqlMapClient"/>
         <property name="sqlMapClient" ref="sqlMapClient"/>
     </bean>
     </bean>
 
 
-    <bean id="MetaSnapshotDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO">
+    <bean id="metaSnapshotDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO">
         <property name="sqlMapClient" ref="sqlMapClient"/>
         <property name="sqlMapClient" ref="sqlMapClient"/>
     </bean>
     </bean>
 </beans>
 </beans>

+ 0 - 0
parse/src/main/resources/mybatis-config.xml → parse/src/test/resources/tsdb/mybatis-config.xml


+ 0 - 0
deployer/src/main/resources/spring/sql-map/sqlmap_history.xml → parse/src/test/resources/tsdb/sql-map/sqlmap_history.xml


+ 0 - 0
parse/src/main/resources/sql-map/sqlmap_snapshot.xml → parse/src/test/resources/tsdb/sql-map/sqlmap_snapshot.xml


+ 0 - 0
parse/src/main/resources/sqlmap-config.xml → parse/src/test/resources/tsdb/sqlmap-config.xml