Ver código fonte

fixed rds binlog local

agapple 7 anos atrás
pai
commit
77342663e3
30 arquivos alterados com 1174 adições e 130 exclusões
  1. 4 0
      common/pom.xml
  2. 1 3
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/SpringInstanceConfigMonitor.java
  3. 1 0
      deployer/src/main/resources/canal.properties
  4. 5 5
      deployer/src/main/resources/example/instance.properties
  5. 31 0
      deployer/src/main/resources/example/rds_instance.properties
  6. 2 2
      deployer/src/main/resources/spring/default-instance.xml
  7. 2 2
      deployer/src/main/resources/spring/file-instance.xml
  8. 2 2
      deployer/src/main/resources/spring/group-instance.xml
  9. 18 45
      deployer/src/main/resources/spring/local-instance.xml
  10. 1 1
      deployer/src/main/resources/spring/memory-instance.xml
  11. 1 1
      deployer/src/main/resources/spring/tsdb/h2-tsdb.xml
  12. 1 1
      deployer/src/main/resources/spring/tsdb/mysql-tsdb.xml
  13. 8 0
      parse/pom.xml
  14. 29 39
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java
  15. 14 7
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java
  16. 1 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  17. 32 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  18. 38 6
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  19. 3 3
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/ddl/DruidDdlParser.java
  20. 368 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/HttpHelper.java
  21. 334 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogOpenApi.java
  22. 111 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsLocalBinlogEventParser.java
  23. 4 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
  24. 27 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsBinlogOpenApiTest.java
  25. 117 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsLocalBinlogDumpTest.java
  26. 1 1
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaManagerTest.java
  27. 1 1
      parse/src/test/resources/tsdb/derby-tsdb.xml
  28. 1 1
      parse/src/test/resources/tsdb/h2-tsdb.xml
  29. 1 1
      parse/src/test/resources/tsdb/mysql-tsdb.xml
  30. 15 0
      pom.xml

+ 4 - 0
common/pom.xml

@@ -35,6 +35,10 @@
 			<groupId>commons-lang</groupId>
 			<artifactId>commons-lang</artifactId>
 		</dependency>
+		<dependency>
+			<groupId>commons-codec</groupId>
+			<artifactId>commons-codec</artifactId>
+		</dependency>
 		<dependency>
 			<groupId>com.alibaba</groupId>
 			<artifactId>fastjson</artifactId>

+ 1 - 3
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/SpringInstanceConfigMonitor.java

@@ -208,8 +208,7 @@ public class SpringInstanceConfigMonitor extends AbstractCanalLifeCycle implemen
                 action.reload(destination);
                 logger.info("auto notify reload {} successful.", destination);
             } catch (Throwable e) {
-                logger.error(String.format("scan reload found[%s] but reload failed",
-                    destination), e);
+                logger.error(String.format("scan reload found[%s] but reload failed", destination), e);
             }
         }
     }
@@ -312,5 +311,4 @@ public class SpringInstanceConfigMonitor extends AbstractCanalLifeCycle implemen
         }
 
     }
-
 }

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -66,6 +66,7 @@ canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
 canal.instance.global.mode = spring 
 canal.instance.global.lazy = false
 #canal.instance.global.manager.address = 127.0.0.1:1099
+#canal.instance.global.spring.xml = classpath:spring/local-instance.xml
 #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
 canal.instance.global.spring.xml = classpath:spring/file-instance.xml
 #canal.instance.global.spring.xml = classpath:spring/default-instance.xml

+ 5 - 5
deployer/src/main/resources/example/instance.properties

@@ -1,10 +1,10 @@
 #################################################
 ## mysql serverId
-canal.instance.mysql.slaveId=1234
+canal.instance.mysql.slaveId=0
 # position info
 canal.instance.master.address=127.0.0.1:3306
-canal.instance.master.journal.name=mysql-bin.000001
-canal.instance.master.position=104606
+canal.instance.master.journal.name=
+canal.instance.master.position=
 canal.instance.master.timestamp=
 
 
@@ -27,7 +27,7 @@ canal.instance.dbPassword=canal
 canal.instance.defaultDatabaseName=test
 canal.instance.connectionCharset=UTF-8
 # table regex
-canal.instance.filter.regex=test\\..*
+canal.instance.filter.regex=*\\..*
 # table black regex
-canal.instance.filter.black.regex=canal_tsdb\\..*
+canal.instance.filter.black.regex=
 #################################################

+ 31 - 0
deployer/src/main/resources/example/rds_instance.properties

@@ -0,0 +1,31 @@
+#################################################
+canal.instance.rds.open.url=https://rds.aliyuncs.com/
+canal.instance.rds.open.accesskey=
+canal.instance.rds.open.secretkey=
+canal.instance.rds.instanceId=
+canal.instance.rds.startTime=
+canal.instance.rds.endTime=
+
+# local binlog dir
+canal.instance.parser.directory=${canal.file.data.dir:../conf}/${canal.instance.destination:}/binlog
+# position info
+canal.instance.master.address=127.0.0.1:3306
+
+# table meta tsdb info
+canal.instance.tsdb.enable=true
+canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
+canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
+#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
+#canal.instance.tsdb.dbUsername=canal
+#canal.instance.tsdb.dbPassword=canal
+
+# username/password
+canal.instance.dbUsername=canal
+canal.instance.dbPassword=canal
+canal.instance.defaultDatabaseName=test
+canal.instance.connectionCharset=UTF-8
+# table regex
+canal.instance.filter.regex=test\\..*
+# table black regex
+canal.instance.filter.black.regex=
+#################################################

+ 2 - 2
deployer/src/main/resources/spring/default-instance.xml

@@ -85,7 +85,7 @@
 
 	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
 		<property name="destination" value="${canal.instance.destination}" />
-		<property name="slaveId" value="${canal.instance.mysql.slaveId:1234}" />
+		<property name="slaveId" value="${canal.instance.mysql.slaveId:0}" />
 		<!-- 心跳配置 -->
 		<property name="detectingEnable" value="${canal.instance.detecting.enable:false}" />
 		<property name="detectingSQL" value="${canal.instance.detecting.sql}" />
@@ -185,7 +185,7 @@
 		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
 		
 		<!--表结构相关-->
-		<property name="enableTsdb" value="${canal.instance.tsdb.enable:false}"/>
+		<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
 	</bean>
 </beans>

+ 2 - 2
deployer/src/main/resources/spring/file-instance.xml

@@ -71,7 +71,7 @@
 
 	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
 		<property name="destination" value="${canal.instance.destination}" />
-		<property name="slaveId" value="${canal.instance.mysql.slaveId:1234}" />
+		<property name="slaveId" value="${canal.instance.mysql.slaveId:0}" />
 		<!-- 心跳配置 -->
 		<property name="detectingEnable" value="${canal.instance.detecting.enable:false}" />
 		<property name="detectingSQL" value="${canal.instance.detecting.sql}" />
@@ -170,7 +170,7 @@
 		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
 
 		<!--表结构相关-->
-		<property name="enableTsdb" value="${canal.instance.tsdb.enable:false}"/>
+		<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
 	</bean>
 </beans>

+ 2 - 2
deployer/src/main/resources/spring/group-instance.xml

@@ -77,7 +77,7 @@
 
 	<bean id="eventParser1" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
 		<property name="destination" value="${canal.instance.destination}" />
-		<property name="slaveId" value="${canal.instance.mysql.slaveId:1234}" />
+		<property name="slaveId" value="${canal.instance.mysql.slaveId:0}" />
 		<!-- 心跳配置 -->
 		<property name="detectingEnable" value="${canal.instance.detecting.enable:false}" />
 		<property name="detectingSQL" value="${canal.instance.detecting.sql}" />
@@ -168,7 +168,7 @@
 	
 	<bean id="eventParser2" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
 		<property name="destination" value="${canal.instance.destination}" />
-		<property name="slaveId" value="${canal.instance.mysql.slaveId:1234}" />
+		<property name="slaveId" value="${canal.instance.mysql.slaveId:0}" />
 		<!-- 心跳配置 -->
 		<property name="detectingEnable" value="${canal.instance.detecting.enable:false}" />
 		<property name="detectingSQL" value="${canal.instance.detecting.sql}" />

+ 18 - 45
deployer/src/main/resources/spring/local-instance.xml

@@ -48,41 +48,14 @@
 		<property name="alarmHandler">
 			<ref local="alarmHandler" />
 		</property>
-		<property name="eventFetcher">
-			<ref local="eventFetcher"/>
-		</property>
-	</bean>
-	
-	<bean id="eventFetcher" class="com.alibaba.otter.canal.mq.fetch.CanalEventFetcherWithMq">
-		<property name="enable" value="${canal.instance.mq.enable}"/>
-		<property name="sendMode" value="${canal.instance.mq.sendMode}"/>
 	</bean>
 	
 	<!-- 报警处理类 -->
-	<bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.MsgAlarmHandler">
-		<property name="enabled" value="${canal.alarm.enable}" />
-		<property name="environment" value="${canal.alarm.environment}" />
-		<property name="intercenterApi" value="${canal.alarm.intercenter.api}" />
-		<property name="administrators" value="${canal.alarm.administrators}" />
-	</bean>
-	
-	<bean id="zkClientx" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean" >
-		<property name="targetClass" value="com.alibaba.otter.canal.common.zookeeper.ZkClientx" />
-		<property name="targetMethod" value="getZkClient" />
-		<property name="arguments">
-			<list>
-				<value>${canal.zkServers:127.0.0.1:2181}</value>
-			</list>
-		</property>
-	</bean>
+	<bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.LogAlarmHandler" />
 	
-	<bean id="metaManager" class="com.alibaba.otter.canal.meta.PeriodMixedMetaManager">
-		<property name="zooKeeperMetaManager">
-			<bean class="com.alibaba.otter.canal.meta.ZooKeeperMetaManager">
-				<property name="zkClientx" ref="zkClientx" />
-			</bean>
-		</property>
-		<property name="period" value="${canal.zookeeper.flush.period:1000}" />
+	<bean id="metaManager" class="com.alibaba.otter.canal.meta.FileMixedMetaManager">
+		<property name="dataDir" value="${canal.file.data.dir:../conf}" />
+		<property name="period" value="${canal.file.flush.period:1000}" />
 	</bean>
 	
 	<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
@@ -96,10 +69,10 @@
 		<property name="eventStore" ref="eventStore" />
 	</bean>
 
-	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser">
+	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsLocalBinlogEventParser">
 		<property name="destination" value="${canal.instance.destination}" />			
 		<property name="alarmHandler" ref="alarmHandler" />
-		
+
 		<!-- 解析过滤处理 -->
 		<property name="eventFilter">
 			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
@@ -144,25 +117,25 @@
 			</bean>
 		</property>
 		
-		<!-- 解析起始位点 -->
-		<property name="masterPosition">
-			<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
-				<property name="journalName" value="${canal.instance.master.journal.name}" />
-				<property name="position" value="${canal.instance.master.position}" />
-				<property name="timestamp" value="${canal.instance.master.timestamp}" />
-			</bean>
-		</property>
 		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
 		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
 		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
 		<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
 		<property name="filterRows" value="${canal.instance.filter.rows:false}" />
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
-		<property name="needWait" value="${canal.instance.parser.needWait:true}"/>
-		<property name="directory" value="${canal.instance.parser.directory}"/>
+		<property name="needWait" value="${canal.instance.parser.needWait:false}"/>
+		<property name="directory" value="${canal.instance.parser.directory:}"/>
+		
+		<!-- rds相关 -->
+		<property name="url" value="${canal.instance.rds.open.url:}"/>
+		<property name="accesskey" value="${canal.instance.rds.open.accesskey:}"/>
+		<property name="secretkey" value="${canal.instance.rds.open.secretkey:}"/>
+		<property name="instanceId" value="${canal.instance.rds.instanceId:}"/>
+		<property name="startTime" value="${canal.instance.rds.startTime:}"/>
+		<property name="endTime" value="${canal.instance.rds.endTime:}"/>
 		
 		<!--表结构相关-->
-		<property name="enableTsdb" value="${canal.instance.tsdb.enable:false}"/>
+		<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
 	</bean>
-</beans>
+</beans>

+ 1 - 1
deployer/src/main/resources/spring/memory-instance.xml

@@ -68,7 +68,7 @@
 
 	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
 		<property name="destination" value="${canal.instance.destination}" />
-		<property name="slaveId" value="${canal.instance.mysql.slaveId:1234}" />
+		<property name="slaveId" value="${canal.instance.mysql.slaveId:0}" />
 		<!-- 心跳配置 -->
 		<property name="detectingEnable" value="${canal.instance.detecting.enable:false}" />
 		<property name="detectingSQL" value="${canal.instance.detecting.sql}" />

+ 1 - 1
deployer/src/main/resources/spring/tsdb/h2-tsdb.xml

@@ -23,7 +23,7 @@
 	</bean>
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 1 - 1
deployer/src/main/resources/spring/tsdb/mysql-tsdb.xml

@@ -23,7 +23,7 @@
 	</bean>
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 8 - 0
parse/pom.xml

@@ -62,6 +62,14 @@
 			<groupId>com.h2database</groupId>
 			<artifactId>h2</artifactId>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.httpcomponents</groupId>
+			<artifactId>httpclient</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-compress</artifactId>
+		</dependency>
 		<!-- test dependency -->
 		<dependency>
 			<groupId>junit</groupId>

+ 29 - 39
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -8,6 +8,7 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.mysql.local.BinLogFileQueue;
@@ -84,26 +85,19 @@ public class LocalBinLogConnection implements ErosaConnection {
             while (running) {
                 boolean needContinue = true;
                 LogEvent event = null;
-                L: while (fetcher.fetch()) {
-                    /*
-                     * event = decoder.decode(fetcher, context); if (event ==
-                     * null) { throw new CanalParseException("parse failed"); }
-                     * if (!func.sink(event)) { needContinue = false; break; }
-                     */
-
-                    do {
-                        if (event == null) {
-                            event = new RotateLogEvent(context.getLogPosition().getFileName(), context.getLogPosition()
-                                .getPosition());
-                        } else {
-                            event = decoder.decode(fetcher, context);
-                        }
+                // 处理一下binlog文件名
+                event = new RotateLogEvent(context.getLogPosition().getFileName(), 4);
+                func.sink(event);
+                while (fetcher.fetch()) {
+                    event = decoder.decode(fetcher, context);
+                    if (event == null) {
+                        throw new CanalParseException("parse failed");
+                    }
 
-                        if (event != null && !func.sink(event)) {
-                            needContinue = false;
-                            break L;
-                        }
-                    } while (event != null);
+                    if (!func.sink(event)) {
+                        needContinue = false;
+                        break;
+                    }
                 }
 
                 if (needContinue) {// 读取下一个
@@ -121,7 +115,6 @@ public class LocalBinLogConnection implements ErosaConnection {
                     }
 
                     current = nextFile;
-
                     fetcher.open(current);
                     context.setLogPosition(new LogPosition(nextFile.getName()));
                 } else {
@@ -161,30 +154,27 @@ public class LocalBinLogConnection implements ErosaConnection {
 
                 binlogFilename = lastXidLogFilename;
                 binlogFileOffset = lastXidLogFileOffset;
-                L: while (fetcher.fetch()) {
-                    LogEvent event;
-                    do {
-                        event = decoder.decode(fetcher, context);
-                        if (event != null) {
-                            if (event.getWhen() > timestampSeconds) {
-                                break L;
-                            }
+                while (fetcher.fetch()) {
+                    LogEvent event = decoder.decode(fetcher, context);
+                    if (event != null) {
+                        if (event.getWhen() > timestampSeconds) {
+                            break;
+                        }
 
-                            needContinue = false;
-                            if (LogEvent.QUERY_EVENT == event.getHeader().getType()) {
-                                if (StringUtils.endsWithIgnoreCase(((QueryLogEvent) event).getQuery(), "BEGIN")) {
-                                    binlogFilename = lastXidLogFilename;
-                                    binlogFileOffset = lastXidLogFileOffset;
-                                } else if (StringUtils.endsWithIgnoreCase(((QueryLogEvent) event).getQuery(), "COMMIT")) {
-                                    lastXidLogFilename = current.getName();
-                                    lastXidLogFileOffset = event.getLogPos();
-                                }
-                            } else if (LogEvent.XID_EVENT == event.getHeader().getType()) {
+                        needContinue = false;
+                        if (LogEvent.QUERY_EVENT == event.getHeader().getType()) {
+                            if (StringUtils.endsWithIgnoreCase(((QueryLogEvent) event).getQuery(), "BEGIN")) {
+                                binlogFilename = lastXidLogFilename;
+                                binlogFileOffset = lastXidLogFileOffset;
+                            } else if (StringUtils.endsWithIgnoreCase(((QueryLogEvent) event).getQuery(), "COMMIT")) {
                                 lastXidLogFilename = current.getName();
                                 lastXidLogFileOffset = event.getLogPos();
                             }
+                        } else if (LogEvent.XID_EVENT == event.getHeader().getType()) {
+                            lastXidLogFilename = current.getName();
+                            lastXidLogFileOffset = event.getLogPos();
                         }
-                    } while (event != null);
+                    }
                 }
 
                 if (needContinue) {// 读取下一个

+ 14 - 7
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java

@@ -9,6 +9,7 @@ import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta;
 import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
@@ -23,14 +24,14 @@ import com.alibaba.otter.canal.protocol.position.LogPosition;
 public class LocalBinlogEventParser extends AbstractMysqlEventParser implements CanalEventParser {
 
     // 数据库信息
-    private AuthenticationInfo masterInfo;
-    private EntryPosition      masterPosition;        // binlog信息
-    private MysqlConnection    metaConnection;        // 查询meta信息的链接
-    private TableMetaCache     tableMetaCache;        // 对应meta
+    protected AuthenticationInfo masterInfo;
+    protected EntryPosition      masterPosition;        // binlog信息
+    protected MysqlConnection    metaConnection;        // 查询meta信息的链接
+    protected TableMetaCache     tableMetaCache;        // 对应meta
 
-    private String             directory;
-    private boolean            needWait   = false;
-    private int                bufferSize = 16 * 1024;
+    protected String             directory;
+    protected boolean            needWait   = false;
+    protected int                bufferSize = 16 * 1024;
 
     public LocalBinlogEventParser(){
         // this.runningInfo = new AuthenticationInfo();
@@ -50,6 +51,12 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
             throw new CanalParseException(e);
         }
 
+        if (tableMetaTSDB != null && tableMetaTSDB instanceof DatabaseTableMeta) {
+            ((DatabaseTableMeta) tableMetaTSDB).setConnection(metaConnection);
+            ((DatabaseTableMeta) tableMetaTSDB).setFilter(eventFilter);
+            ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
+        }
+
         tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);
         ((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
     }

+ 1 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -156,6 +156,7 @@ public class MysqlConnection implements ErosaConnection {
         BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket();
         binlogDumpCmd.binlogFileName = binlogfilename;
         binlogDumpCmd.binlogPosition = binlogPosition;
+        // binlogDumpCmd.slaveServerId = this.slaveId;
         binlogDumpCmd.slaveServerId = this.slaveId;
         byte[] cmdBody = binlogDumpCmd.toBytes();
 

+ 32 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -1,7 +1,9 @@
 package com.alibaba.otter.canal.parse.inbound.mysql;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -26,7 +28,7 @@ import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.BinlogFormat;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.BinlogImage;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
-import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
@@ -118,10 +120,10 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 }
             }
 
-            if (tableMetaTSDB != null && tableMetaTSDB instanceof TableMetaManager) {
-                ((TableMetaManager) tableMetaTSDB).setConnection(metaConnection);
-                ((TableMetaManager) tableMetaTSDB).setFilter(eventFilter);
-                ((TableMetaManager) tableMetaTSDB).setBlackFilter(eventBlackFilter);
+            if (tableMetaTSDB != null && tableMetaTSDB instanceof DatabaseTableMeta) {
+                ((DatabaseTableMeta) tableMetaTSDB).setConnection(metaConnection);
+                ((DatabaseTableMeta) tableMetaTSDB).setFilter(eventFilter);
+                ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
             }
 
             tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);
@@ -313,10 +315,35 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         connection.getConnector().setSendBufferSize(sendBufferSize);
         connection.getConnector().setSoTimeout(defaultConnectionTimeoutInSeconds * 1000);
         connection.setCharset(connectionCharset);
+        // 随机生成slaveId
+        if (this.slaveId <= 0) {
+            this.slaveId = generateUniqueServerId();
+        }
         connection.setSlaveId(this.slaveId);
         return connection;
     }
 
+    private final long generateUniqueServerId() {
+        try {
+            // a=`echo $masterip|cut -d\. -f1`
+            // b=`echo $masterip|cut -d\. -f2`
+            // c=`echo $masterip|cut -d\. -f3`
+            // d=`echo $masterip|cut -d\. -f4`
+            // #server_id=`expr $a \* 256 \* 256 \* 256 + $b \* 256 \* 256 + $c
+            // \* 256 + $d `
+            // #server_id=$b$c$d
+            // server_id=`expr $b \* 256 \* 256 + $c \* 256 + $d `
+            InetAddress localHost = InetAddress.getLocalHost();
+            byte[] addr = localHost.getAddress();
+            int salt = (destination != null) ? destination.hashCode() : 0;
+            return ((0x7f & salt) << 24) + ((0xff & (int) addr[1]) << 16) // NL
+                   + ((0xff & (int) addr[2]) << 8) // NL
+                   + (0xff & (int) addr[3]);
+        } catch (UnknownHostException e) {
+            throw new CanalParseException("Unknown host", e);
+        }
+    }
+
     protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException {
         EntryPosition startPosition = findStartPositionInternal(connection);
         if (needTransactionPosition.get()) {

+ 38 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -6,6 +6,7 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.Charset;
 import java.sql.Types;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 
@@ -294,7 +295,15 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         String queryString = null;
         try {
             queryString = new String(event.getRowsQuery().getBytes(ISO_8859_1), charset.name());
-            return buildQueryEntry(queryString, event.getHeader());
+            String tableName = null;
+            if (useDruidDdlFilter) {
+                List<DdlResult> results = DruidDdlParser.parse(queryString, null);
+                if (results.size() > 0) {
+                    tableName = results.get(0).getTableName();
+                }
+            }
+
+            return buildQueryEntry(queryString, event.getHeader(), tableName);
         } catch (UnsupportedEncodingException e) {
             throw new CanalParseException(e);
         }
@@ -355,6 +364,9 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 throw new TableIdNotFoundException("not found tableId:" + event.getTableId());
             }
 
+            boolean isRDSHeartBeat = tableMetaCache.isOnRDS()
+                                     && isRDSHeartBeat(table.getDbName(), table.getTableName());
+
             String fullname = table.getDbName() + "." + table.getTableName();
             // check name filter
             if (nameFilter != null && !nameFilter.filter(fullname)) {
@@ -364,9 +376,17 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 return null;
             }
 
-            if (tableMetaCache.isOnRDS() && "mysql.ha_health_check".equals(fullname)) {
-                // 忽略rds模式的mysql.ha_health_check心跳数据
-                return null;
+            // if (isHeartBeat || isRDSHeartBeat) {
+            // // 忽略rds模式的mysql.ha_health_check心跳数据
+            // return null;
+            // }
+            TableMeta tableMeta = null;
+            if (isRDSHeartBeat) {
+                // 处理rds模式的mysql.ha_health_check心跳数据
+                // 主要RDS的心跳表基本无权限,需要mock一个tableMeta
+                FieldMeta idMeta = new FieldMeta("id", "bigint(20)", true, false, "0");
+                FieldMeta typeMeta = new FieldMeta("type", "char(1)", false, true, "0");
+                tableMeta = new TableMeta(table.getDbName(), table.getTableName(), Arrays.asList(idMeta, typeMeta));
             }
 
             EventType eventType = null;
@@ -396,8 +416,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             BitSet columns = event.getColumns();
             BitSet changeColumns = event.getChangeColumns();
             boolean tableError = false;
-            TableMeta tableMeta = null;
-            if (tableMetaCache != null) {// 入错存在table meta cache
+            if (tableMetaCache != null && tableMeta == null) {// 入错存在table meta
+                                                              // cache
                 tableMeta = getTableMeta(table.getDbName(), table.getTableName(), true, position);
                 if (tableMeta == null) {
                     tableError = true;
@@ -661,6 +681,14 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
     }
 
+    private Entry buildQueryEntry(String queryString, LogHeader logHeader, String tableName) {
+        Header header = createHeader(binlogFileName, logHeader, "", tableName, EventType.QUERY);
+        RowChange.Builder rowChangeBuider = RowChange.newBuilder();
+        rowChangeBuider.setSql(queryString);
+        rowChangeBuider.setEventType(EventType.QUERY);
+        return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
+    }
+
     private Entry buildQueryEntry(String queryString, LogHeader logHeader) {
         Header header = createHeader(binlogFileName, logHeader, "", "", EventType.QUERY);
         RowChange.Builder rowChangeBuider = RowChange.newBuilder();
@@ -739,6 +767,10 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                || "TEXT".equalsIgnoreCase(columnType) || "TINYTEXT".equalsIgnoreCase(columnType);
     }
 
+    private boolean isRDSHeartBeat(String schema, String table) {
+        return "mysql".equalsIgnoreCase(schema) && "ha_health_check".equalsIgnoreCase(table);
+    }
+
     public static TransactionBegin createTransactionBegin(long threadId) {
         TransactionBegin.Builder beginBuilder = TransactionBegin.newBuilder();
         beginBuilder.setThreadId(threadId);

+ 3 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/ddl/DruidDdlParser.java

@@ -140,21 +140,21 @@ public class DruidDdlParser {
             } else if (statement instanceof SQLInsertStatement) {
                 DdlResult ddlResult = new DdlResult();
                 SQLInsertStatement insert = (SQLInsertStatement) statement;
-                processName(ddlResult, schmeaName, insert.getTableName(), true);
+                processName(ddlResult, schmeaName, insert.getTableName(), false);
                 ddlResult.setType(EventType.INSERT);
                 ddlResults.add(ddlResult);
             } else if (statement instanceof SQLUpdateStatement) {
                 DdlResult ddlResult = new DdlResult();
                 SQLUpdateStatement update = (SQLUpdateStatement) statement;
                 // 拿到的表名可能为null,比如update a,b set a.id=x
-                processName(ddlResult, schmeaName, update.getTableName(), true);
+                processName(ddlResult, schmeaName, update.getTableName(), false);
                 ddlResult.setType(EventType.UPDATE);
                 ddlResults.add(ddlResult);
             } else if (statement instanceof SQLDeleteStatement) {
                 DdlResult ddlResult = new DdlResult();
                 SQLDeleteStatement delete = (SQLDeleteStatement) statement;
                 // 拿到的表名可能为null,比如delete a,b from a where a.id = b.id
-                processName(ddlResult, schmeaName, delete.getTableName(), true);
+                processName(ddlResult, schmeaName, delete.getTableName(), false);
                 ddlResult.setType(EventType.DELETE);
                 ddlResults.add(ddlResult);
             }

+ 368 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/HttpHelper.java

@@ -0,0 +1,368 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.rds;
+
+import static org.apache.http.client.config.RequestConfig.custom;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.CookieStore;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.cookie.Cookie;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.TrustStrategy;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class HttpHelper {
+
+    protected static final Logger logger = LoggerFactory.getLogger(HttpHelper.class);
+
+    public static byte[] getBytes(String url, int timeout) throws Exception {
+        long start = System.currentTimeMillis();
+        HttpClientBuilder builder = HttpClientBuilder.create();
+        builder.setMaxConnPerRoute(50);
+        builder.setMaxConnTotal(100);
+        CloseableHttpClient httpclient = builder.build();
+        URI uri = new URIBuilder(url).build();
+        RequestConfig config = custom().setConnectTimeout(timeout)
+            .setConnectionRequestTimeout(timeout)
+            .setSocketTimeout(timeout)
+            .build();
+        HttpGet httpGet = new HttpGet(uri);
+        HttpClientContext context = HttpClientContext.create();
+        context.setRequestConfig(config);
+        CloseableHttpResponse response = httpclient.execute(httpGet, context);
+        try {
+            int statusCode = response.getStatusLine().getStatusCode();
+            long end = System.currentTimeMillis();
+            long cost = end - start;
+            if (logger.isWarnEnabled()) {
+                logger.warn("post " + url + ", cost : " + cost);
+            }
+            if (statusCode == HttpStatus.SC_OK) {
+                return EntityUtils.toByteArray(response.getEntity());
+            } else {
+                String errorMsg = EntityUtils.toString(response.getEntity());
+                throw new RuntimeException("requestGet remote error, url=" + uri.toString() + ", code=" + statusCode
+                                           + ", error msg=" + errorMsg);
+            }
+        } finally {
+            response.close();
+            httpGet.releaseConnection();
+        }
+    }
+
+    public static String get(String url, int timeout) {
+        // logger.info("get url is :" + url);
+        // 支持采用https协议,忽略证书
+        url = url.trim();
+        if (url.startsWith("https")) {
+            return getIgnoreCerf(url, null, null, timeout);
+        }
+        long start = System.currentTimeMillis();
+        HttpClientBuilder builder = HttpClientBuilder.create();
+        builder.setMaxConnPerRoute(50);
+        builder.setMaxConnTotal(100);
+        CloseableHttpClient httpclient = builder.build();
+        CloseableHttpResponse response = null;
+        HttpGet httpGet = null;
+        try {
+            URI uri = new URIBuilder(url).build();
+            RequestConfig config = custom().setConnectTimeout(timeout)
+                .setConnectionRequestTimeout(timeout)
+                .setSocketTimeout(timeout)
+                .build();
+            httpGet = new HttpGet(uri);
+            HttpClientContext context = HttpClientContext.create();
+            context.setRequestConfig(config);
+            response = httpclient.execute(httpGet, context);
+            int statusCode = response.getStatusLine().getStatusCode();
+            if (statusCode == HttpStatus.SC_OK) {
+                return EntityUtils.toString(response.getEntity());
+            } else {
+                String errorMsg = EntityUtils.toString(response.getEntity());
+                throw new RuntimeException("requestGet remote error, url=" + uri.toString() + ", code=" + statusCode
+                                           + ", error msg=" + errorMsg);
+            }
+        } catch (Throwable t) {
+            long end = System.currentTimeMillis();
+            long cost = end - start;
+            String curlRequest = getCurlRequest(url, null, null, cost);
+            throw new RuntimeException("requestGet remote error, request : " + curlRequest, t);
+        } finally {
+            long end = System.currentTimeMillis();
+            long cost = end - start;
+            printCurlRequest(url, null, null, cost);
+            if (response != null) {
+                try {
+                    response.close();
+                } catch (IOException e) {
+                }
+            }
+            httpGet.releaseConnection();
+        }
+    }
+
+    private static String getIgnoreCerf(String url, CookieStore cookieStore, Map<String, String> params, int timeout) {
+        long start = System.currentTimeMillis();
+        HttpClientBuilder builder = HttpClientBuilder.create();
+        builder.setMaxConnPerRoute(50);
+        builder.setMaxConnTotal(100);
+        HttpGet httpGet = null;
+        CloseableHttpResponse response = null;
+        try {
+            // 创建支持忽略证书的https
+            final SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
+
+                @Override
+                public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+                    return true;
+                }
+            }).build();
+
+            CloseableHttpClient httpClient = HttpClientBuilder.create()
+                .setSSLContext(sslContext)
+                .setConnectionManager(new PoolingHttpClientConnectionManager(RegistryBuilder.<ConnectionSocketFactory> create()
+                    .register("http", PlainConnectionSocketFactory.INSTANCE)
+                    .register("https", new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE))
+                    .build()))
+                .build();
+            // ---------------- 创建支持https 的client成功---------
+
+            URI uri = new URIBuilder(url).build();
+            RequestConfig config = custom().setConnectTimeout(timeout)
+                .setConnectionRequestTimeout(timeout)
+                .setSocketTimeout(timeout)
+                .build();
+            httpGet = new HttpGet(uri);
+            HttpClientContext context = HttpClientContext.create();
+            context.setRequestConfig(config);
+            response = httpClient.execute(httpGet, context);
+            int statusCode = response.getStatusLine().getStatusCode();
+            if (statusCode == HttpStatus.SC_OK) {
+                return EntityUtils.toString(response.getEntity());
+            } else {
+                String errorMsg = EntityUtils.toString(response.getEntity());
+                throw new RuntimeException("requestGet remote error, url=" + uri.toString() + ", code=" + statusCode
+                                           + ", error msg=" + errorMsg);
+            }
+        } catch (Throwable t) {
+            long end = System.currentTimeMillis();
+            long cost = end - start;
+            String curlRequest = getCurlRequest(url, cookieStore, params, cost);
+            throw new RuntimeException("requestPost(Https) remote error, request : " + curlRequest, t);
+        } finally {
+            long end = System.currentTimeMillis();
+            long cost = end - start;
+            printCurlRequest(url, null, null, cost);
+            if (response != null) {
+                try {
+                    response.close();
+                } catch (IOException e) {
+                }
+            }
+            if (httpGet != null) {
+                httpGet.releaseConnection();
+            }
+        }
+    }
+
+    private static String postIgnoreCerf(String url, CookieStore cookieStore, Map<String, String> params, int timeout) {
+        long start = System.currentTimeMillis();
+        HttpClientBuilder builder = HttpClientBuilder.create();
+        builder.setMaxConnPerRoute(50);
+        builder.setMaxConnTotal(100);
+        HttpPost httpPost = null;
+        CloseableHttpResponse response = null;
+        try {
+            // 创建支持忽略证书的https
+            final SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
+
+                @Override
+                public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+                    return true;
+                }
+            }).build();
+
+            CloseableHttpClient httpClient = HttpClientBuilder.create()
+                .setSSLContext(sslContext)
+                .setConnectionManager(new PoolingHttpClientConnectionManager(RegistryBuilder.<ConnectionSocketFactory> create()
+                    .register("http", PlainConnectionSocketFactory.INSTANCE)
+                    .register("https", new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE))
+                    .build()))
+                .build();
+            // ---------------- 创建支持https 的client成功---------
+
+            URI uri = new URIBuilder(url).build();
+            RequestConfig config = custom().setConnectTimeout(timeout)
+                .setConnectionRequestTimeout(timeout)
+                .setSocketTimeout(timeout)
+                .build();
+            httpPost = new HttpPost(uri);
+            List<NameValuePair> parameters = Lists.newArrayList();
+            for (String key : params.keySet()) {
+                NameValuePair nameValuePair = new BasicNameValuePair(key, params.get(key));
+                parameters.add(nameValuePair);
+            }
+            httpPost.setEntity(new UrlEncodedFormEntity(parameters, Charset.forName("UTF-8")));
+            HttpClientContext context = HttpClientContext.create();
+            context.setRequestConfig(config);
+            context.setCookieStore(cookieStore);
+
+            response = httpClient.execute(httpPost, context);
+            int statusCode = response.getStatusLine().getStatusCode();
+            if (statusCode == HttpStatus.SC_OK) {
+                long end = System.currentTimeMillis();
+                long cost = end - start;
+                printCurlRequest(url, cookieStore, params, cost);
+                return EntityUtils.toString(response.getEntity());
+            } else {
+                long end = System.currentTimeMillis();
+                long cost = end - start;
+                String curlRequest = getCurlRequest(url, cookieStore, params, cost);
+                throw new RuntimeException("requestPost(Https) remote error, request : " + curlRequest
+                                           + ", statusCode=" + statusCode + "");
+            }
+        } catch (Throwable t) {
+            long end = System.currentTimeMillis();
+            long cost = end - start;
+            String curlRequest = getCurlRequest(url, cookieStore, params, cost);
+            throw new RuntimeException("requestPost(Https) remote error, request : " + curlRequest, t);
+        } finally {
+            if (response != null) {
+                try {
+                    response.close();
+                } catch (IOException e) {
+                }
+            }
+            if (httpPost != null) {
+                httpPost.releaseConnection();
+            }
+        }
+    }
+
+    public static String post(String url, CookieStore cookieStore, Map<String, String> params, int timeout) {
+        url = url.trim();
+        // 支持采用https协议,忽略证书
+        if (url.startsWith("https")) {
+            return postIgnoreCerf(url, cookieStore, params, timeout);
+        }
+        long start = System.currentTimeMillis();
+        HttpClientBuilder builder = HttpClientBuilder.create();
+        builder.setMaxConnPerRoute(50);
+        builder.setMaxConnTotal(100);
+        HttpPost httpPost = null;
+        CloseableHttpResponse response = null;
+        try {
+            CloseableHttpClient httpclient = builder.build();
+            URI uri = new URIBuilder(url).build();
+            RequestConfig config = custom().setConnectTimeout(timeout)
+                .setConnectionRequestTimeout(timeout)
+                .setSocketTimeout(timeout)
+                .build();
+            httpPost = new HttpPost(uri);
+            List<NameValuePair> parameters = Lists.newArrayList();
+            for (String key : params.keySet()) {
+                NameValuePair nameValuePair = new BasicNameValuePair(key, params.get(key));
+                parameters.add(nameValuePair);
+            }
+            httpPost.setEntity(new UrlEncodedFormEntity(parameters, Charset.forName("UTF-8")));
+            HttpClientContext context = HttpClientContext.create();
+            context.setRequestConfig(config);
+            context.setCookieStore(cookieStore);
+
+            response = httpclient.execute(httpPost, context);
+            int statusCode = response.getStatusLine().getStatusCode();
+            if (statusCode == HttpStatus.SC_OK) {
+                long end = System.currentTimeMillis();
+                long cost = end - start;
+                printCurlRequest(url, cookieStore, params, cost);
+                return EntityUtils.toString(response.getEntity());
+            } else {
+                long end = System.currentTimeMillis();
+                long cost = end - start;
+                String curlRequest = getCurlRequest(url, cookieStore, params, cost);
+                throw new RuntimeException("requestPost remote error, request : " + curlRequest + ", statusCode="
+                                           + statusCode + ";" + EntityUtils.toString(response.getEntity()));
+            }
+        } catch (Throwable t) {
+            long end = System.currentTimeMillis();
+            long cost = end - start;
+            String curlRequest = getCurlRequest(url, cookieStore, params, cost);
+            throw new RuntimeException("requestPost remote error, request : " + curlRequest, t);
+        } finally {
+            if (response != null) {
+                try {
+                    response.close();
+                } catch (IOException e) {
+                }
+            }
+            if (httpPost != null) {
+                httpPost.releaseConnection();
+            }
+        }
+    }
+
+    public static void printCurlRequest(String url, CookieStore cookieStore, Map<String, String> params, long cost) {
+        logger.warn(getCurlRequest(url, cookieStore, params, cost));
+    }
+
+    private static String getCurlRequest(String url, CookieStore cookieStore, Map<String, String> params, long cost) {
+        if (params == null) {
+            return "curl '" + url + "'\ncost : " + cost;
+        } else {
+            StringBuilder paramsStr = new StringBuilder();
+            Iterator<Map.Entry<String, String>> iterator = params.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<String, String> entry = iterator.next();
+                paramsStr.append(entry.getKey() + "=" + entry.getValue());
+                if (iterator.hasNext()) {
+                    paramsStr.append("&");
+                }
+            }
+            if (cookieStore == null) {
+                return "curl '" + url + "' -d '" + paramsStr.toString() + "'\ncost : " + cost;
+            } else {
+                StringBuilder cookieStr = new StringBuilder();
+                List<Cookie> cookies = cookieStore.getCookies();
+                Iterator<Cookie> iteratorCookie = cookies.iterator();
+                while (iteratorCookie.hasNext()) {
+                    Cookie cookie = iteratorCookie.next();
+                    cookieStr.append(cookie.getName() + "=" + cookie.getValue());
+                    if (iteratorCookie.hasNext()) {
+                        cookieStr.append(";");
+                    }
+                }
+                return "curl '" + url + "' -b '" + cookieStr + "' -d '" + paramsStr.toString() + "'\ncost : " + cost;
+            }
+        }
+    }
+}

+ 334 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogOpenApi.java

@@ -0,0 +1,334 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.rds;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import javax.crypto.Mac;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+
+/**
+ * @author agapple 2017年10月14日 下午1:53:52
+ * @since 1.0.25
+ */
+public class RdsBinlogOpenApi {
+
+    protected static final Logger logger              = LoggerFactory.getLogger(RdsBinlogOpenApi.class);
+    private static final String   ISO8601_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
+    private static final int      TIMEOUT             = 10000;
+    private static final String   ENCODING            = "UTF-8";
+    private static final String   MAC_NAME            = "HmacSHA1";
+    private static final String   API_VERSION         = "2014-08-15";
+    private static final String   SIGNATURE_VERSION   = "1.0";
+
+    public static void downloadBinlogFiles(String url, String ak, String sk, String dbInstanceId, Date startTime,
+                                           Date endTime, File destDir) throws Throwable {
+        int pageSize = 100;
+        int pageNumber = 0;
+        int pageRecordCount = 1;
+        String hostInstanceID = null;
+        while (pageRecordCount > 0 && pageRecordCount <= pageSize) {
+            pageNumber += 1;
+            String result = describeBinlogFiles(url, ak, sk, dbInstanceId, startTime, endTime, pageSize, pageNumber);
+            JSONObject jsobObj = JSON.parseObject(result);
+            pageRecordCount = jsobObj.getInteger("PageRecordCount");
+
+            if (pageRecordCount > 0) {
+                FileUtils.forceMkdir(destDir);
+                File hostIdFile = new File(destDir, "hostId");
+                if (hostIdFile.exists()) {
+                    List<String> lines = IOUtils.readLines(new FileInputStream(hostIdFile));
+                    hostInstanceID = StringUtils.join(lines, "\n");
+                }
+
+                String itemStr = jsobObj.getString("Items");
+                JSONObject binLogFileObj = JSONObject.parseObject(itemStr);
+                JSONArray items = binLogFileObj.getJSONArray("BinLogFile");
+                if (items == null || items.isEmpty()) {
+                    continue;
+                }
+                for (int i = 0; i < items.size(); i++) {
+                    JSONObject item = (JSONObject) items.get(i);
+                    String oneHostInstanceID = item.getString("HostInstanceID");
+                    if (hostInstanceID == null) {
+                        hostInstanceID = oneHostInstanceID;
+                        FileOutputStream hostIdFileOut = null;
+                        try {
+                            hostIdFileOut = new FileOutputStream(hostIdFile);
+                            hostIdFileOut.write(oneHostInstanceID.getBytes());
+                            hostIdFileOut.flush();
+                        } finally {
+                            IOUtils.closeQuietly(hostIdFileOut);
+                        }
+                    }
+
+                    if (hostInstanceID.equals(oneHostInstanceID)) { // 只选择一个host下载
+                        String downloadLink = item.getString("DownloadLink");
+                        String fileName = StringUtils.substringBetween(downloadLink, "mysql-bin.", ".tar");
+                        if (StringUtils.isNotEmpty(fileName)) {
+                            File currentFile = new File(destDir, "mysql-bin." + fileName);
+                            if (currentFile.isFile() && currentFile.exists()) {
+                                // 检查一下文件是否存在,存在就就没必要下载了
+                                continue;
+                            }
+                        }
+
+                        HttpGet httpGet = new HttpGet(downloadLink);
+                        CloseableHttpClient httpClient = HttpClientBuilder.create()
+                            .setMaxConnPerRoute(50)
+                            .setMaxConnTotal(100)
+                            .build();
+                        RequestConfig requestConfig = RequestConfig.custom()
+                            .setConnectTimeout(TIMEOUT)
+                            .setConnectionRequestTimeout(TIMEOUT)
+                            .setSocketTimeout(TIMEOUT)
+                            .build();
+                        httpGet.setConfig(requestConfig);
+                        HttpResponse response = httpClient.execute(httpGet);
+                        int statusCode = response.getStatusLine().getStatusCode();
+                        if (statusCode != HttpResponseStatus.OK.code()) {
+                            throw new RuntimeException("download failed , url:" + downloadLink + " , statusCode:"
+                                                       + statusCode);
+                        }
+                        saveFile(destDir, response);
+                    }
+                }
+            }
+        }
+    }
+
+    private static void saveFile(File parentFile, HttpResponse response) throws IOException {
+        InputStream is = response.getEntity().getContent();
+        long totalSize = Long.parseLong(response.getFirstHeader("Content-Length").getValue());
+        String fileName = response.getFirstHeader("Content-Disposition").getValue();
+        fileName = StringUtils.substringAfter(fileName, "filename=");
+        boolean isTar = StringUtils.endsWith(fileName, ".tar");
+        FileUtils.forceMkdir(parentFile);
+        FileOutputStream fos = null;
+        try {
+            if (isTar) {
+                TarArchiveInputStream tais = new TarArchiveInputStream(is);
+                TarArchiveEntry tarArchiveEntry = null;
+                while ((tarArchiveEntry = tais.getNextTarEntry()) != null) {
+                    String name = tarArchiveEntry.getName();
+                    File tarFile = new File(parentFile, name);
+                    logger.info("start to download file " + tarFile.getName());
+                    BufferedOutputStream bos = null;
+                    try {
+                        bos = new BufferedOutputStream(new FileOutputStream(tarFile));
+                        int read = -1;
+                        byte[] buffer = new byte[1024];
+                        while ((read = tais.read(buffer)) != -1) {
+                            bos.write(buffer, 0, read);
+                        }
+                        logger.info("download file " + tarFile.getName() + " end!");
+                    } finally {
+                        IOUtils.closeQuietly(bos);
+                    }
+                }
+                tais.close();
+            } else {
+                File file = new File(parentFile, fileName);
+                if (!file.isFile()) {
+                    file.createNewFile();
+                }
+                try {
+                    fos = new FileOutputStream(file);
+                    byte[] buffer = new byte[1024];
+                    int len;
+                    long copySize = 0;
+                    long nextPrintProgress = 0;
+                    logger.info("start to download file " + file.getName());
+                    while ((len = is.read(buffer)) != -1) {
+                        fos.write(buffer, 0, len);
+                        copySize += len;
+                        long progress = copySize * 100 / totalSize;
+                        if (progress >= nextPrintProgress) {
+                            logger.info("download " + file.getName() + " progress : " + progress
+                                        + "% , download size : " + copySize + ", total size : " + totalSize);
+                            nextPrintProgress += 10;
+                        }
+                    }
+                    logger.info("download file " + file.getName() + " end!");
+                    fos.flush();
+                } finally {
+                    IOUtils.closeQuietly(fos);
+                }
+            }
+        } finally {
+            IOUtils.closeQuietly(fos);
+        }
+    }
+
+    public static String describeBinlogFiles(String url, String ak, String sk, String dbInstanceId, Date startTime,
+                                             Date endTime, int pageSize, int pageNumber) throws Exception {
+        Map<String, String> paramMap = new HashMap<String, String>();
+        paramMap.put("Action", "DescribeBinlogFiles");
+        paramMap.put("DBInstanceId", dbInstanceId); // rds实例id
+        paramMap.put("StartTime", formatIso8601Date(startTime));
+        paramMap.put("EndTime", formatIso8601Date(endTime));
+        paramMap.put("PageSize", String.valueOf(pageSize));
+        paramMap.put("PageNumber", String.valueOf(pageNumber));
+        return doRequest(url, paramMap, ak, sk);
+    }
+
+    private static String doRequest(String domin, Map<String, String> param, String ak, String sk) throws Exception {
+        param.put("AccessKeyId", ak);
+        param.put("SignatureMethod", "HMAC-SHA1");
+        param.put("SignatureVersion", SIGNATURE_VERSION);
+        param.put("Version", API_VERSION);
+        param.put("SignatureNonce", UUID.randomUUID().toString());
+        param.put("Format", "JSON");
+        param.put("Timestamp", formatIso8601Date(new Date()));
+        String signStr = generate("POST", param, sk);
+        param.put("Signature", signStr);
+        String request = concatQueryString(param);
+        String url = domin + "?" + request;
+        String result = HttpHelper.post(url, null, Collections.EMPTY_MAP, TIMEOUT);
+        return result;
+    }
+
+    public static String concatQueryString(Map<String, String> parameters) throws UnsupportedEncodingException {
+        if (null == parameters) {
+            return null;
+        }
+        StringBuilder urlBuilder = new StringBuilder("");
+        for (Map.Entry<String, String> entry : parameters.entrySet()) {
+            String key = entry.getKey();
+            String val = entry.getValue();
+            urlBuilder.append(encode(key));
+            if (val != null) {
+                urlBuilder.append("=").append(encode(val));
+            }
+            urlBuilder.append("&");
+        }
+        int strIndex = urlBuilder.length();
+        if (parameters.size() > 0) {
+            urlBuilder.deleteCharAt(strIndex - 1);
+        }
+        return urlBuilder.toString();
+    }
+
+    public static String encode(String value) throws UnsupportedEncodingException {
+        return URLEncoder.encode(value, "UTF-8");
+    }
+
+    private static String formatIso8601Date(Date date) {
+        SimpleDateFormat df = new SimpleDateFormat(ISO8601_DATE_FORMAT);
+        df.setTimeZone(TimeZone.getTimeZone("GMT"));
+        return df.format(date);
+    }
+
+    /**
+     * 使用 HMAC-SHA1 签名方法对对encryptText进行签名
+     *
+     * @param encryptText 被签名的字符串
+     * @param encryptKey 密钥
+     * @return
+     * @throws Exception
+     */
+    public static byte[] HmacSHA1Encrypt(String encryptText, String encryptKey) throws Exception {
+        byte[] data = encryptKey.getBytes(ENCODING);
+        // 根据给定的字节数组构造一个密钥,第二参数指定一个密钥算法的名称
+        SecretKey secretKey = new SecretKeySpec(data, MAC_NAME);
+        // 生成一个指定 Mac 算法 的 Mac 对象
+        Mac mac = Mac.getInstance(MAC_NAME);
+        // 用给定密钥初始化 Mac 对象
+        mac.init(secretKey);
+
+        byte[] text = encryptText.getBytes(ENCODING);
+        // 完成 Mac 操作
+        return mac.doFinal(text);
+    }
+
+    private static String base64(byte input[]) throws UnsupportedEncodingException {
+        return new String(Base64.encodeBase64(input), ENCODING);
+    }
+
+    /** 对参数名称和参数值进行URL编码 **/
+    public static String generate(String method, Map<String, String> parameter, String accessKeySecret)
+                                                                                                       throws Exception {
+        String signString = generateSignString(method, parameter);
+        byte[] signBytes = HmacSHA1Encrypt(signString, accessKeySecret + "&");
+        String signature = base64(signBytes);
+        if ("POST".equals(method)) {
+            return signature;
+        }
+        return URLEncoder.encode(signature, "UTF-8");
+    }
+
+    private static String generateQueryString(TreeMap<String, String> treeMap) {
+        StringBuilder canonicalizedQueryString = new StringBuilder();
+        boolean first = true;
+        for (String key : treeMap.navigableKeySet()) {
+            String value = treeMap.get(key);
+            if (!first) {
+                canonicalizedQueryString.append("&");
+            }
+            first = false;
+            canonicalizedQueryString.append(percentEncode(key)).append("=").append(percentEncode(value));
+        }
+        return canonicalizedQueryString.toString();
+    }
+
+    public static String generateSignString(String httpMethod, Map<String, String> parameter) throws IOException {
+        TreeMap<String, String> sortParameter = new TreeMap<String, String>();
+        sortParameter.putAll(parameter);
+        String canonicalizedQueryString = generateQueryString(sortParameter);
+        if (null == httpMethod) {
+            throw new RuntimeException("httpMethod can not be empty");
+        }
+        /** 构造待签名的字符串* */
+        StringBuilder stringToSign = new StringBuilder();
+        stringToSign.append(httpMethod).append("&");
+        stringToSign.append(percentEncode("/")).append("&");
+        stringToSign.append(percentEncode(canonicalizedQueryString));
+        return stringToSign.toString();
+    }
+
+    public static String percentEncode(String value) {
+        try {
+            return value == null ? null : URLEncoder.encode(value, ENCODING)
+                .replaceAll("\\+", "%20")
+                .replaceAll("\\*", "%2A")
+                .replaceAll("%7E", "~");
+        } catch (Exception e) {
+        }
+        return "";
+    }
+}

+ 111 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsLocalBinlogEventParser.java

@@ -0,0 +1,111 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.rds;
+
+import java.io.File;
+import java.util.Date;
+
+import org.apache.commons.lang.StringUtils;
+import org.springframework.util.Assert;
+
+import com.alibaba.otter.canal.parse.CanalEventParser;
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+
+/**
+ * 基于rds binlog备份文件的复制
+ * 
+ * @author agapple 2017年10月15日 下午1:27:36
+ * @since 1.0.25
+ */
+public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements CanalEventParser {
+
+    private String url = "https://rds.aliyuncs.com/"; // openapi地址
+    private String accesskey;                        // 云账号的ak
+    private String secretkey;                        // 云账号sk
+    private String instanceId;                       // rds实例id
+    private Long   startTime;
+    private Long   endTime;
+
+    public RdsLocalBinlogEventParser(){
+    }
+
+    public void start() throws CanalParseException {
+        try {
+            Assert.notNull(startTime);
+            Assert.notNull(accesskey);
+            Assert.notNull(secretkey);
+            Assert.notNull(instanceId);
+            Assert.notNull(url);
+            if (endTime == null) {
+                endTime = System.currentTimeMillis();
+            }
+
+            RdsBinlogOpenApi.downloadBinlogFiles(url,
+                accesskey,
+                secretkey,
+                instanceId,
+                new Date(startTime),
+                new Date(endTime),
+                new File(directory));
+
+            // 更新一下时间戳
+            masterPosition = new EntryPosition(startTime);
+        } catch (Throwable e) {
+            logger.error("download binlog failed", e);
+            throw new CanalParseException(e);
+        }
+
+        super.start();
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        if (StringUtils.isNotEmpty(url)) {
+            this.url = url;
+        }
+    }
+
+    public String getAccesskey() {
+        return accesskey;
+    }
+
+    public void setAccesskey(String accesskey) {
+        this.accesskey = accesskey;
+    }
+
+    public String getSecretkey() {
+        return secretkey;
+    }
+
+    public void setSecretkey(String secretkey) {
+        this.secretkey = secretkey;
+    }
+
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    public void setInstanceId(String instanceId) {
+        this.instanceId = instanceId;
+    }
+
+    public Long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(Long startTime) {
+        this.startTime = startTime;
+    }
+
+    public Long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(Long endTime) {
+        this.endTime = endTime;
+    }
+
+}

+ 4 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaManager.java → parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -37,14 +37,14 @@ import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDO;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 
 /**
- * 基于console远程管理 see internal class: CanalTableMeta , ConsoleTableMetaTSDB
+ * 基于db远程管理 see internal class: CanalTableMeta , ConsoleTableMetaTSDB
  *
  * @author agapple 2017年7月27日 下午10:47:55
  * @since 3.2.5
  */
-public class TableMetaManager implements TableMetaTSDB {
+public class DatabaseTableMeta implements TableMetaTSDB {
 
-    private static Logger              logger        = LoggerFactory.getLogger(TableMetaManager.class);
+    private static Logger              logger        = LoggerFactory.getLogger(DatabaseTableMeta.class);
     private static Pattern             pattern       = Pattern.compile("Duplicate entry '.*' for key '*'");
     private static Pattern             h2Pattern     = Pattern.compile("Unique index or primary key violation");
     private static final EntryPosition INIT_POSITION = new EntryPosition("0", 0L, -2L, -1L);
@@ -58,7 +58,7 @@ public class TableMetaManager implements TableMetaTSDB {
     private MetaHistoryDAO             metaHistoryDAO;
     private MetaSnapshotDAO            metaSnapshotDAO;
 
-    public TableMetaManager(){
+    public DatabaseTableMeta(){
 
     }
 

+ 27 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsBinlogOpenApiTest.java

@@ -0,0 +1,27 @@
+package com.alibaba.otter.canal.parse.inbound.mysql;
+
+import java.io.File;
+import java.util.Date;
+
+import org.apache.commons.lang.time.DateUtils;
+
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogOpenApi;
+
+/**
+ * @author agapple 2017年10月15日 下午2:14:34
+ * @since 1.0.25
+ */
+public class RdsBinlogOpenApiTest {
+
+    public void testSimple() throws Throwable {
+        Date startTime = DateUtils.parseDate("2017-10-13 20:56:58", new String[] { "yyyy-MM-dd HH:mm:ss" });
+        Date endTime = DateUtils.parseDate("2017-10-14 02:57:59", new String[] { "yyyy-MM-dd HH:mm:ss" });
+        RdsBinlogOpenApi.downloadBinlogFiles("https://rds.aliyuncs.com/",
+            "",
+            "",
+            "rm-bp180v4mfjnm157es",
+            startTime,
+            endTime,
+            new File("/tmp/binlog/"));
+    }
+}

+ 117 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsLocalBinlogDumpTest.java

@@ -0,0 +1,117 @@
+package com.alibaba.otter.canal.parse.inbound.mysql;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsLocalBinlogEventParser;
+import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
+import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
+import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
+import com.alibaba.otter.canal.protocol.CanalEntry.Column;
+import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
+import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
+import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
+import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
+import com.alibaba.otter.canal.protocol.position.LogPosition;
+import com.alibaba.otter.canal.sink.exception.CanalSinkException;
+
+/**
+ * @author agapple 2017年10月15日 下午2:16:58
+ * @since 1.0.25
+ */
+public class RdsLocalBinlogDumpTest {
+
+    @Test
+    public void testSimple() {
+        String directory = "/tmp/rds";
+        final RdsLocalBinlogEventParser controller = new RdsLocalBinlogEventParser();
+        controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("127.0.0.1", 3306), "root", "hello"));
+        controller.setConnectionCharset(Charset.forName("UTF-8"));
+        controller.setDirectory(directory);
+        controller.setAccesskey("");
+        controller.setSecretkey("");
+        controller.setInstanceId("");
+        controller.setStartTime(1507860498350L);
+        controller.setEventSink(new AbstractCanalEventSinkTest<List<Entry>>() {
+
+            public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)
+                                                                                                        throws CanalSinkException,
+                                                                                                        InterruptedException {
+
+                for (Entry entry : entrys) {
+                    if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
+                        || entry.getEntryType() == EntryType.TRANSACTIONEND) {
+                        continue;
+                    }
+
+                    if (entry.getEntryType() == EntryType.ROWDATA) {
+                        RowChange rowChage = null;
+                        try {
+                            rowChage = RowChange.parseFrom(entry.getStoreValue());
+                        } catch (Exception e) {
+                            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+                                                       + entry.toString(), e);
+                        }
+
+                        EventType eventType = rowChage.getEventType();
+                        System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
+                            entry.getHeader().getLogfileName(),
+                            entry.getHeader().getLogfileOffset(),
+                            entry.getHeader().getSchemaName(),
+                            entry.getHeader().getTableName(),
+                            eventType));
+
+                        for (RowData rowData : rowChage.getRowDatasList()) {
+                            if (eventType == EventType.DELETE) {
+                                print(rowData.getBeforeColumnsList());
+                            } else if (eventType == EventType.INSERT) {
+                                print(rowData.getAfterColumnsList());
+                            } else {
+                                System.out.println("-------> before");
+                                print(rowData.getBeforeColumnsList());
+                                System.out.println("-------> after");
+                                print(rowData.getAfterColumnsList());
+                            }
+                        }
+                    }
+                }
+
+                return true;
+            }
+
+        });
+        controller.setLogPositionManager(new AbstractLogPositionManager() {
+
+            @Override
+            public LogPosition getLatestIndexBy(String destination) {
+                return null;
+            }
+
+            @Override
+            public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
+                System.out.println(logPosition);
+            }
+        });
+
+        controller.start();
+
+        try {
+            Thread.sleep(100 * 1000 * 1000L);
+        } catch (InterruptedException e) {
+            Assert.fail(e.getMessage());
+        }
+        controller.stop();
+    }
+
+    private void print(List<Column> columns) {
+        for (Column column : columns) {
+            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
+        }
+    }
+}

+ 1 - 1
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaManagerTest.java

@@ -26,7 +26,7 @@ import com.alibaba.otter.canal.protocol.position.EntryPosition;
 public class TableMetaManagerTest {
 
     @Resource
-    TableMetaManager tableMetaManager;
+    DatabaseTableMeta tableMetaManager;
 
     @Test
     public void testSimple() throws FileNotFoundException, IOException {

+ 1 - 1
parse/src/test/resources/tsdb/derby-tsdb.xml

@@ -7,7 +7,7 @@
        default-autowire="byName">
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 1 - 1
parse/src/test/resources/tsdb/h2-tsdb.xml

@@ -7,7 +7,7 @@
        default-autowire="byName">
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 1 - 1
parse/src/test/resources/tsdb/mysql-tsdb.xml

@@ -7,7 +7,7 @@
        default-autowire="byName">
 	
 	<!-- 基于db的实现 -->
-	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager">
+	<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta">
 		<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
 		<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
 	</bean>

+ 15 - 0
pom.xml

@@ -173,6 +173,21 @@
                 <artifactId>commons-io</artifactId>
                 <version>2.4</version>
             </dependency>
+            <dependency>
+                <groupId>commons-codec</groupId>
+                <artifactId>commons-codec</artifactId>
+                <version>1.9</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-compress</artifactId>
+                <version>1.9</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.httpcomponents</groupId>
+                <artifactId>httpclient</artifactId>
+                <version>4.5.1</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.zookeeper</groupId>
                 <artifactId>zookeeper</artifactId>