Selaa lähdekoodia

fixed issue #727 , support rds binlog dump

七锋 6 vuotta sitten
vanhempi
commit
d3f52c6d3d
35 muutettua tiedostoa jossa 556 lisäystä ja 886 poistoa
  1. 3 1
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/FileLogFetcher.java
  2. 4 1
      deployer/src/main/resources/canal.properties
  3. 5 0
      deployer/src/main/resources/example/instance.properties
  4. 0 32
      deployer/src/main/resources/example/rds_instance.properties
  5. 39 0
      deployer/src/main/resources/spring/base-instance.xml
  6. 2 21
      deployer/src/main/resources/spring/default-instance.xml
  7. 2 21
      deployer/src/main/resources/spring/file-instance.xml
  8. 3 22
      deployer/src/main/resources/spring/group-instance.xml
  9. 0 147
      deployer/src/main/resources/spring/local-instance.xml
  10. 2 21
      deployer/src/main/resources/spring/memory-instance.xml
  11. 2 0
      parse/src/main/java/com/alibaba/otter/canal/parse/exception/ServerIdNotMatchException.java
  12. 5 9
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  13. 2 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/MultiStageCoprocessor.java
  14. 4 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java
  15. 34 23
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java
  16. 11 14
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  17. 19 28
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  18. 19 38
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java
  19. 20 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  20. 46 16
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.java
  21. 87 12
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/BinlogDownloadQueue.java
  22. 1 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/HttpHelper.java
  23. 92 75
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java
  24. 23 307
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogOpenApi.java
  25. 20 13
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsLocalBinlogEventParser.java
  26. 9 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/BinlogFile.java
  27. 6 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/DescribeBinlogFileResult.java
  28. 14 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/RdsBackupPolicy.java
  29. 7 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/RdsItem.java
  30. 35 34
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/AbstractRequest.java
  31. 3 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/DescribeBackupPolicyRequest.java
  32. 7 8
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/DescribeBinlogFilesRequest.java
  33. 2 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java
  34. 11 10
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsBinlogEventParserProxyTest.java
  35. 17 10
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsBinlogOpenApiTest.java

+ 3 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/FileLogFetcher.java

@@ -168,7 +168,9 @@ public final class FileLogFetcher extends LogFetcher {
      * @see com.taobao.tddl.dbsync.binlog.LogFetcher#close()
      */
     public void close() throws IOException {
-        if (fin != null) fin.close();
+        if (fin != null) {
+            fin.close();
+        }
 
         fin = null;
     }

+ 4 - 1
deployer/src/main/resources/canal.properties

@@ -66,6 +66,10 @@ canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MO
 canal.instance.tsdb.dbUsername=canal
 canal.instance.tsdb.dbPassword=canal
 
+# rds oss binlog account
+canal.instance.rds.accesskey =
+canal.instance.rds.secretkey =
+
 #################################################
 ######### 		destinations		############# 
 #################################################
@@ -82,7 +86,6 @@ canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
 canal.instance.global.mode = spring 
 canal.instance.global.lazy = false
 #canal.instance.global.manager.address = 127.0.0.1:1099
-#canal.instance.global.spring.xml = classpath:spring/local-instance.xml
 #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
 canal.instance.global.spring.xml = classpath:spring/file-instance.xml
 #canal.instance.global.spring.xml = classpath:spring/default-instance.xml

+ 5 - 0
deployer/src/main/resources/example/instance.properties

@@ -12,6 +12,11 @@ canal.instance.master.position=
 canal.instance.master.timestamp=
 canal.instance.master.gtid=
 
+# rds oss binlog
+canal.instance.rds.accesskey=
+canal.instance.rds.secretkey=
+canal.instance.rds.instanceId=
+
 # table meta tsdb info
 canal.instance.tsdb.enable=true
 #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb

+ 0 - 32
deployer/src/main/resources/example/rds_instance.properties

@@ -1,32 +0,0 @@
-#################################################
-# rds openapi binlog
-canal.instance.rds.open.url=https://rds.aliyuncs.com/
-canal.instance.rds.open.accesskey=
-canal.instance.rds.open.secretkey=
-canal.instance.rds.instanceId=
-canal.instance.rds.startTime=
-canal.instance.rds.endTime=
-
-# local binlog dir
-canal.instance.parser.directory=${canal.file.data.dir:../conf}/${canal.instance.destination:}/binlog
-# position info
-canal.instance.master.address=127.0.0.1:3306
-
-# table meta tsdb info
-canal.instance.tsdb.enable=true
-canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
-canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
-#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
-#canal.instance.tsdb.dbUsername=canal
-#canal.instance.tsdb.dbPassword=canal
-
-# username/password
-canal.instance.dbUsername=canal
-canal.instance.dbPassword=canal
-canal.instance.defaultDatabaseName=test
-canal.instance.connectionCharset=UTF-8
-# table regex
-canal.instance.filter.regex=test\\..*
-# table black regex
-canal.instance.filter.black.regex=
-#################################################

+ 39 - 0
deployer/src/main/resources/spring/base-instance.xml

@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
+	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:lang="http://www.springframework.org/schema/lang"
+	xmlns:context="http://www.springframework.org/schema/context"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
+           http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-2.0.xsd
+           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
+           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
+	default-autowire="byName">
+
+	<!-- properties -->
+	<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
+		<property name="ignoreResourceNotFound" value="true" />
+		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
+		<property name="locationNames">
+			<list>
+				<value>classpath:canal.properties</value>
+				<value>classpath:${canal.instance.destination:}/instance.properties</value>
+			</list>
+		</property>
+	</bean>
+	
+	<bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
+	<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> 
+		<property name="propertyEditorRegistrars">
+			<list>
+				<ref bean="socketAddressEditor" />
+			</list>
+		</property>
+	</bean>
+	
+	<bean id="baseEventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogEventParserProxy" abstract="true">
+		<property name="accesskey" value="${canal.instance.rds.accesskey:}" />
+		<property name="secretkey" value="${canal.instance.rds.secretkey:}" />
+		<property name="instanceId" value="${canal.instance.rds.instanceId:}" />
+	</bean>
+</beans>

+ 2 - 21
deployer/src/main/resources/spring/default-instance.xml

@@ -10,26 +10,7 @@
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
 	default-autowire="byName">
 
-	<!-- properties -->
-	<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
-		<property name="ignoreResourceNotFound" value="true" />
-		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
-		<property name="locationNames">
-			<list>
-				<value>classpath:canal.properties</value>
-				<value>classpath:${canal.instance.destination:}/instance.properties</value>
-			</list>
-		</property>
-	</bean>
-	
-	<bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
-	<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> 
-		<property name="propertyEditorRegistrars">
-			<list>
-				<ref bean="socketAddressEditor" />
-			</list>
-		</property>
-	</bean>
+	<import resource="classpath:spring/base-instance.xml" />
 	
 	<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
 		<property name="destination" value="${canal.instance.destination}" />
@@ -84,7 +65,7 @@
 		<property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
 	</bean>
 
-	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
+	<bean id="eventParser" parent="baseEventParser" >
 		<property name="destination" value="${canal.instance.destination}" />
 		<property name="slaveId" value="${canal.instance.mysql.slaveId:0}" />
 		<!-- 心跳配置 -->

+ 2 - 21
deployer/src/main/resources/spring/file-instance.xml

@@ -10,26 +10,7 @@
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
 	default-autowire="byName">
 
-	<!-- properties -->
-	<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
-		<property name="ignoreResourceNotFound" value="true" />
-		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
-		<property name="locationNames">
-			<list>
-				<value>classpath:canal.properties</value>
-				<value>classpath:${canal.instance.destination:}/instance.properties</value>
-			</list>
-		</property>
-	</bean>
-	
-	<bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
-	<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> 
-		<property name="propertyEditorRegistrars">
-			<list>
-				<ref bean="socketAddressEditor" />
-			</list>
-		</property>
-	</bean>
+	<import resource="classpath:spring/base-instance.xml" />
 	
 	<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
 		<property name="destination" value="${canal.instance.destination}" />
@@ -70,7 +51,7 @@
 		<property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
 	</bean>
 
-	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
+	<bean id="eventParser" parent="baseEventParser">
 		<property name="destination" value="${canal.instance.destination}" />
 		<property name="slaveId" value="${canal.instance.mysql.slaveId:0}" />
 		<!-- 心跳配置 -->

+ 3 - 22
deployer/src/main/resources/spring/group-instance.xml

@@ -10,26 +10,7 @@
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
 	default-autowire="byName">
 	
-	<!-- properties -->
-	<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
-		<property name="ignoreResourceNotFound" value="true" />
-		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
-		<property name="locationNames">
-			<list>
-				<value>classpath:canal.properties</value>
-				<value>classpath:${canal.instance.destination:}/instance.properties</value>
-			</list>
-		</property>
-	</bean>
-	
-	<bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
-	<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> 
-		<property name="propertyEditorRegistrars">
-			<list>
-				<ref bean="socketAddressEditor" />
-			</list>
-		</property>
-	</bean>
+	<import resource="classpath:spring/base-instance.xml" />
 
 	<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
 		<property name="destination" value="${canal.instance.destination}" />
@@ -76,7 +57,7 @@
 		</property>
 	</bean>
 
-	<bean id="eventParser1" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
+	<bean id="eventParser1" parent="baseEventParser">
 		<property name="destination" value="${canal.instance.destination}" />
 		<property name="slaveId" value="${canal.instance.mysql.slaveId:0}" />
 		<!-- 心跳配置 -->
@@ -174,7 +155,7 @@
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
 	
-	<bean id="eventParser2" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
+	<bean id="eventParser2" parent="baseEventParser">
 		<property name="destination" value="${canal.instance.destination}" />
 		<property name="slaveId" value="${canal.instance.mysql.slaveId:0}" />
 		<!-- 心跳配置 -->

+ 0 - 147
deployer/src/main/resources/spring/local-instance.xml

@@ -1,147 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<beans xmlns="http://www.springframework.org/schema/beans"
-	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
-	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:lang="http://www.springframework.org/schema/lang"
-	xmlns:context="http://www.springframework.org/schema/context"
-	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
-           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
-           http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-2.0.xsd
-           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
-           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
-	default-autowire="byName">
-
-	<!-- properties -->
-	<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
-		<property name="ignoreResourceNotFound" value="true" />
-		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
-		<property name="locationNames">
-			<list>
-				<value>classpath:canal.properties</value>
-				<value>classpath:${canal.instance.destination:}/instance.properties</value>
-			</list>
-		</property>
-	</bean>
-	
-	<bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
-	<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> 
-		<property name="propertyEditorRegistrars">
-			<list>
-				<ref bean="socketAddressEditor" />
-			</list>
-		</property>
-	</bean>
-	
-	<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
-		<property name="destination" value="${canal.instance.destination}" />
-		<property name="eventParser">
-			<ref local="eventParser" />
-		</property>
-		<property name="eventSink">
-			<ref local="eventSink" />
-		</property>
-		<property name="eventStore">
-			<ref local="eventStore" />
-		</property>
-		<property name="metaManager">
-			<ref local="metaManager" />
-		</property>
-		<property name="alarmHandler">
-			<ref local="alarmHandler" />
-		</property>
-	</bean>
-	
-	<!-- 报警处理类 -->
-	<bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.LogAlarmHandler" />
-	
-	<bean id="metaManager" class="com.alibaba.otter.canal.meta.FileMixedMetaManager">
-		<property name="dataDir" value="${canal.file.data.dir:../conf}" />
-		<property name="period" value="${canal.file.flush.period:1000}" />
-	</bean>
-	
-	<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
-		<property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
-		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
-		<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
-		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
-	</bean>
-	
-	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
-		<property name="eventStore" ref="eventStore" />
-		<property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
-	</bean>
-
-	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsLocalBinlogEventParser">
-		<property name="destination" value="${canal.instance.destination}" />			
-		<property name="alarmHandler" ref="alarmHandler" />
-
-		<!-- 解析过滤处理 -->
-		<property name="eventFilter">
-			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
-				<constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" />
-			</bean>
-		</property>
-		
-		<property name="eventBlackFilter">
-			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
-				<constructor-arg index="0" value="${canal.instance.filter.black.regex:}" />
-				<constructor-arg index="1" value="false" />
-			</bean>
-		</property>
-		
-		<!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
-		<property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
-			
-		<!-- 解析编码 -->
-		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
-	
-		<!-- 解析位点记录 -->
-		<property name="logPositionManager">
-			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
-				<constructor-arg>
-					<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
-				</constructor-arg>
-				<constructor-arg>
-					<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
-						<constructor-arg ref="metaManager"/>
-					</bean>
-				</constructor-arg>
-			</bean>
-		</property>
-		
-		<!-- 解析数据库信息 -->
-		<property name="masterInfo">
-			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
-				<property name="address" value="${canal.instance.master.address}" />
-				<property name="username" value="${canal.instance.dbUsername:retl}" />
-				<property name="password" value="${canal.instance.dbPassword:retl}" />
-				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:test}" />
-			</bean>
-		</property>
-		
-		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
-		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
-		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
-		<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
-		<property name="filterRows" value="${canal.instance.filter.rows:false}" />
-		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
-		<property name="needWait" value="${canal.instance.parser.needWait:false}"/>
-		<property name="directory" value="${canal.instance.parser.directory:}"/>
-		
-		<!-- rds相关 -->
-		<property name="url" value="${canal.instance.rds.open.url:}"/>
-		<property name="accesskey" value="${canal.instance.rds.open.accesskey:}"/>
-		<property name="secretkey" value="${canal.instance.rds.open.secretkey:}"/>
-		<property name="instanceId" value="${canal.instance.rds.instanceId:}"/>
-		<property name="startTime" value="${canal.instance.rds.startTime:}"/>
-		<property name="endTime" value="${canal.instance.rds.endTime:}"/>
-		
-		<!--表结构相关-->
-		<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
-		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
-		
-		<!-- parallel parser -->
-		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
-		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize}" />
-		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
-	</bean>
-</beans>

+ 2 - 21
deployer/src/main/resources/spring/memory-instance.xml

@@ -10,26 +10,7 @@
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
 	default-autowire="byName">
 	
-	<!-- properties -->
-	<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
-		<property name="ignoreResourceNotFound" value="true" />
-		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
-		<property name="locationNames">
-			<list>
-				<value>classpath:canal.properties</value>
-				<value>classpath:${canal.instance.destination:}/instance.properties</value>
-			</list>
-		</property>
-	</bean>
-	
-	<bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
-	<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> 
-		<property name="propertyEditorRegistrars">
-			<list>
-				<ref bean="socketAddressEditor" />
-			</list>
-		</property>
-	</bean>
+	<import resource="classpath:spring/base-instance.xml" />
 
 	<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
 		<property name="destination" value="${canal.instance.destination}" />
@@ -67,7 +48,7 @@
 		<property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
 	</bean>
 
-	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
+	<bean id="eventParser" parent="baseEventParser">
 		<property name="destination" value="${canal.instance.destination}" />
 		<property name="slaveId" value="${canal.instance.mysql.slaveId:0}" />
 		<!-- 心跳配置 -->

+ 2 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/exception/ServerIdNotMatchException.java

@@ -8,6 +8,8 @@ import com.alibaba.otter.canal.common.CanalException;
  */
 public class ServerIdNotMatchException extends CanalException{
 
+    private static final long serialVersionUID = -6124989280379293953L;
+
     public ServerIdNotMatchException(String errorCode) {
         super(errorCode);
     }

+ 5 - 9
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -8,8 +8,6 @@ import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.math.RandomUtils;
@@ -23,6 +21,7 @@ import com.alibaba.otter.canal.filter.CanalEventFilter;
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
 import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
 import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer.TransactionFlushCallback;
 import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
@@ -97,10 +96,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
     protected int                                    parallelBufferSize         = 256;                                     // 必须为2的幂
     protected MultiStageCoprocessor                  multiStageCoprocessor;
     protected ParserExceptionHandler                 parserExceptionHandler;
-    protected long serverId;
-
-
-
+    protected long                                   serverId;
 
     protected abstract BinlogParser buildParser();
 
@@ -179,7 +175,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         erosaConnection.connect();// 链接
 
                         long queryServerId = erosaConnection.queryServerId();
-                        if (queryServerId != 0){
+                        if (queryServerId != 0) {
                             serverId = queryServerId;
                         }
                         // 4. 获取最后的位置信息
@@ -289,7 +285,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                                 runningInfo.getAddress().toString()), e);
                             sendAlarm(destination, ExceptionUtils.getFullStackTrace(e));
                         }
-                        if (parserExceptionHandler!=null){
+                        if (parserExceptionHandler != null) {
                             parserExceptionHandler.handle(e);
                         }
                     } finally {
@@ -441,7 +437,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         return logPosition;
     }
 
-    protected void processSinkError(Throwable e, LogPosition lastPosition, String startBinlogFile, long startPosition) {
+    protected void processSinkError(Throwable e, LogPosition lastPosition, String startBinlogFile, Long startPosition) {
         if (lastPosition != null) {
             logger.warn(String.format("ERROR ## parse this event has an error , last position : [%s]",
                 lastPosition.getPostion()),

+ 2 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/MultiStageCoprocessor.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.parse.inbound;
 
 import com.alibaba.otter.canal.common.CanalLifeCycle;
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
 
 /**
  * 针对解析器提供一个多阶段协同的处理
@@ -23,7 +24,7 @@ public interface MultiStageCoprocessor extends CanalLifeCycle {
      */
     public boolean publish(LogBuffer buffer);
 
-    public boolean publish(LogBuffer buffer, String binlogFileName);
+    public boolean publish(LogEvent event);
 
     public void reset();
 }

+ 4 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -134,10 +134,10 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
     protected MultiStageCoprocessor buildMultiStageCoprocessor() {
         MysqlMultiStageCoprocessor mysqlMultiStageCoprocessor = new MysqlMultiStageCoprocessor(parallelBufferSize,
-                parallelThreadSize,
-                (LogEventConvert) binlogParser,
-                transactionBuffer,
-                destination);
+            parallelThreadSize,
+            (LogEventConvert) binlogParser,
+            transactionBuffer,
+            destination);
         mysqlMultiStageCoprocessor.setEventsPublishBlockingTime(eventsPublishBlockingTime);
         return mysqlMultiStageCoprocessor;
     }

+ 34 - 23
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -4,25 +4,24 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
-import com.alibaba.otter.canal.parse.exception.ServerIdNotMatchException;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.parse.exception.ServerIdNotMatchException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor;
 import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.mysql.local.BinLogFileQueue;
 import com.taobao.tddl.dbsync.binlog.FileLogFetcher;
-import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.LogPosition;
 import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
 
 /**
  * local bin log connection (not real connection)
@@ -40,7 +39,6 @@ public class LocalBinLogConnection implements ErosaConnection {
     private long                serverId;
     private FileParserListener  parserListener;
 
-
     public LocalBinLogConnection(){
     }
 
@@ -92,16 +90,13 @@ public class LocalBinLogConnection implements ErosaConnection {
             while (running) {
                 boolean needContinue = true;
                 LogEvent event = null;
-                // 处理一下binlog文件名
-                event = new RotateLogEvent(context.getLogPosition().getFileName(), 4);
-                func.sink(event);
                 while (fetcher.fetch()) {
                     event = decoder.decode(fetcher, context);
                     if (event == null) {
                         continue;
                     }
-                    if (serverId != 0 && event.getServerId() != serverId){
-                        throw new ServerIdNotMatchException("unexpected serverId "+serverId + " in binlog file !");
+                    if (serverId != 0 && event.getServerId() != serverId) {
+                        throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
                     }
 
                     if (!func.sink(event)) {
@@ -168,9 +163,8 @@ public class LocalBinLogConnection implements ErosaConnection {
                 while (fetcher.fetch()) {
                     LogEvent event = decoder.decode(fetcher, context);
                     if (event != null) {
-
-                        if (serverId != 0 && event.getServerId() != serverId){
-                            throw new ServerIdNotMatchException("unexpected serverId "+serverId + " in binlog file !");
+                        if (serverId != 0 && event.getServerId() != serverId) {
+                            throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
                         }
 
                         if (event.getWhen() > timestampSeconds) {
@@ -189,6 +183,9 @@ public class LocalBinLogConnection implements ErosaConnection {
                         } else if (LogEvent.XID_EVENT == event.getHeader().getType()) {
                             lastXidLogFilename = current.getName();
                             lastXidLogFileOffset = event.getLogPos();
+                        } else if (LogEvent.FORMAT_DESCRIPTION_EVENT == event.getHeader().getType()) {
+                            lastXidLogFilename = current.getName();
+                            lastXidLogFileOffset = event.getLogPos();
                         }
                     }
                 }
@@ -225,17 +222,29 @@ public class LocalBinLogConnection implements ErosaConnection {
     @Override
     public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException {
         File current = new File(directory, binlogfilename);
+        if (!current.exists()) {
+            throw new CanalParseException("binlog:" + binlogfilename + " is not found");
+        }
 
         FileLogFetcher fetcher = new FileLogFetcher(bufferSize);
+        LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
+        LogContext context = new LogContext();
         try {
             fetcher.open(current, binlogPosition);
+            context.setLogPosition(new LogPosition(binlogfilename, binlogPosition));
             while (running) {
                 boolean needContinue = true;
+                LogEvent event = null;
                 while (fetcher.fetch()) {
-                    LogBuffer buffer = fetcher.duplicate();
-                    fetcher.consume(fetcher.limit());
-                    // set filename
-                    if (!coprocessor.publish(buffer, binlogfilename)) {
+                    event = decoder.decode(fetcher, context);
+                    if (event == null) {
+                        continue;
+                    }
+                    if (serverId != 0 && event.getServerId() != serverId) {
+                        throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
+                    }
+
+                    if (!coprocessor.publish(event)) {
                         needContinue = false;
                         break;
                     }
@@ -244,7 +253,6 @@ public class LocalBinLogConnection implements ErosaConnection {
                 fetcher.close(); // 关闭上一个文件
                 parserFinish(binlogfilename);
                 if (needContinue) {// 读取下一个
-
                     File nextFile;
                     if (needWait) {
                         nextFile = binlogs.waitForNextFile(current);
@@ -272,8 +280,8 @@ public class LocalBinLogConnection implements ErosaConnection {
         }
     }
 
-    private void parserFinish(String fileName){
-        if (parserListener != null){
+    private void parserFinish(String fileName) {
+        if (parserListener != null) {
             parserListener.onFinish(fileName);
         }
     }
@@ -306,9 +314,8 @@ public class LocalBinLogConnection implements ErosaConnection {
                 while (fetcher.fetch()) {
                     LogEvent event = decoder.decode(fetcher, context);
                     if (event != null) {
-
-                        if (serverId != 0 && event.getServerId() != serverId){
-                            throw new ServerIdNotMatchException("unexpected serverId "+serverId + " in binlog file !");
+                        if (serverId != 0 && event.getServerId() != serverId) {
+                            throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
                         }
 
                         if (event.getWhen() > timestampSeconds) {
@@ -327,6 +334,9 @@ public class LocalBinLogConnection implements ErosaConnection {
                         } else if (LogEvent.XID_EVENT == event.getHeader().getType()) {
                             lastXidLogFilename = current.getName();
                             lastXidLogFileOffset = event.getLogPos();
+                        } else if (LogEvent.FORMAT_DESCRIPTION_EVENT == event.getHeader().getType()) {
+                            lastXidLogFilename = current.getName();
+                            lastXidLogFileOffset = event.getLogPos();
                         }
                     }
                 }
@@ -410,7 +420,8 @@ public class LocalBinLogConnection implements ErosaConnection {
         this.parserListener = parserListener;
     }
 
-    public interface FileParserListener{
+    public interface FileParserListener {
+
         void onFinish(String fileName);
     }
 

+ 11 - 14
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -13,7 +13,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.util.CollectionUtils;
 
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor;
@@ -40,20 +39,20 @@ import com.taobao.tddl.dbsync.binlog.LogEvent;
 
 public class MysqlConnection implements ErosaConnection {
 
-    private static final Logger       logger                    = LoggerFactory.getLogger(MysqlConnection.class);
+    private static final Logger logger      = LoggerFactory.getLogger(MysqlConnection.class);
 
-    private MysqlConnector            connector;
-    private long                      slaveId;
-    private Charset                   charset                   = Charset.forName("UTF-8");
-    private BinlogFormat              binlogFormat;
-    private BinlogImage               binlogImage;
+    private MysqlConnector      connector;
+    private long                slaveId;
+    private Charset             charset     = Charset.forName("UTF-8");
+    private BinlogFormat        binlogFormat;
+    private BinlogImage         binlogImage;
 
     // tsdb releated
-    private AuthenticationInfo        authInfo;
-    protected     int                 connTimeout               = 5 * 1000;                                      // 5秒
-    protected     int                 soTimeout                 = 60 * 60 * 1000;                                // 1小时
+    private AuthenticationInfo  authInfo;
+    protected int               connTimeout = 5 * 1000;                                      // 5秒
+    protected int               soTimeout   = 60 * 60 * 1000;                                // 1小时
     // dump binlog bytes, 暂不包括meta与TSDB
-    private AtomicLong                receivedBinlogBytes;
+    private AtomicLong          receivedBinlogBytes;
 
     public MysqlConnection(){
     }
@@ -338,7 +337,7 @@ public class MysqlConnection implements ErosaConnection {
     public long queryServerId() throws IOException {
         ResultSetPacket resultSetPacket = query("show variables like 'server_id'");
         List<String> fieldValues = resultSetPacket.getFieldValues();
-        if (fieldValues == null || fieldValues.size() != 2){
+        if (fieldValues == null || fieldValues.size() != 2) {
             return 0;
         }
         return NumberUtils.toLong(fieldValues.get(1));
@@ -478,8 +477,6 @@ public class MysqlConnection implements ErosaConnection {
         }
     }
 
-
-
     public static enum BinlogFormat {
 
         STATEMENT("STATEMENT"), ROW("ROW"), MIXED("MIXED");

+ 19 - 28
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -47,32 +47,32 @@ import com.taobao.tddl.dbsync.binlog.LogEvent;
  */
 public class MysqlEventParser extends AbstractMysqlEventParser implements CanalEventParser, CanalHASwitchable {
 
-    private CanalHAController  haController                      = null;
+    private CanalHAController    haController                      = null;
 
-    private int                defaultConnectionTimeoutInSeconds = 30;       // sotimeout
-    private int                receiveBufferSize                 = 64 * 1024;
-    private int                sendBufferSize                    = 64 * 1024;
+    private int                  defaultConnectionTimeoutInSeconds = 30;                // sotimeout
+    private int                  receiveBufferSize                 = 64 * 1024;
+    private int                  sendBufferSize                    = 64 * 1024;
     // 数据库信息
-    protected AuthenticationInfo masterInfo;                                   // 主库
-    protected AuthenticationInfo standbyInfo;                                  // 备库
+    protected AuthenticationInfo masterInfo;                                            // 主库
+    protected AuthenticationInfo standbyInfo;                                           // 备库
     // binlog信息
     protected EntryPosition      masterPosition;
     protected EntryPosition      standbyPosition;
-    private long               slaveId;                                      // 链接到mysql的slave
+    private long                 slaveId;                                               // 链接到mysql的slave
     // 心跳检查信息
-    private String             detectingSQL;                                 // 心跳sql
-    private MysqlConnection    metaConnection;                               // 查询meta信息的链接
-    private TableMetaCache     tableMetaCache;                               // 对应meta
-    private int                fallbackIntervalInSeconds         = 60;       // 切换回退时间
-    private BinlogFormat[]     supportBinlogFormats;                         // 支持的binlogFormat,如果设置会执行强校验
-    private BinlogImage[]      supportBinlogImages;                          // 支持的binlogImage,如果设置会执行强校验
+    private String               detectingSQL;                                          // 心跳sql
+    private MysqlConnection      metaConnection;                                        // 查询meta信息的链接
+    private TableMetaCache       tableMetaCache;                                        // 对应meta
+    private int                  fallbackIntervalInSeconds         = 60;                // 切换回退时间
+    private BinlogFormat[]       supportBinlogFormats;                                  // 支持的binlogFormat,如果设置会执行强校验
+    private BinlogImage[]        supportBinlogImages;                                   // 支持的binlogImage,如果设置会执行强校验
 
     // update by yishun.chen,特殊异常处理参数
-    private       int                 dumpErrorCount            = 0;        // binlogDump失败异常计数
-    private       int                 dumpErrorCountThreshold   = 2;        // binlogDump失败异常计数阀值
+    private int                  dumpErrorCount                    = 0;                 // binlogDump失败异常计数
+    private int                  dumpErrorCountThreshold           = 2;                 // binlogDump失败异常计数阀值
 
     // instance received binlog bytes
-    private final AtomicLong          receivedBinlogBytes       = new AtomicLong(0L);
+    private final AtomicLong     receivedBinlogBytes               = new AtomicLong(0L);
 
     protected ErosaConnection buildErosaConnection() {
         return buildMysqlConnection(this.runningInfo);
@@ -756,16 +756,9 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         Long logposTimestamp = entry.getHeader().getExecuteTime();
                         Long serverId = entry.getHeader().getServerId();
 
-                        if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())
-                            || CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", new Object[] {
-                                        logfilename, logfileoffset, logposTimestamp, startTimestamp });
-                            }
-                            // 事务头和尾寻找第一条记录时间戳,如果最小的一条记录都不满足条件,可直接退出
-                            if (logposTimestamp >= startTimestamp) {
-                                return false;
-                            }
+                        // 如果最小的一条记录都不满足条件,可直接退出
+                        if (logposTimestamp >= startTimestamp) {
+                            return false;
                         }
 
                         if (StringUtils.equals(endPosition.getJournalName(), logfilename)
@@ -914,8 +907,6 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         this.dumpErrorCountThreshold = dumpErrorCountThreshold;
     }
 
-
-
     public AtomicLong getReceivedBinlogBytes() {
         return this.receivedBinlogBytes;
     }

+ 19 - 38
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -5,8 +5,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.LockSupport;
 
-import org.apache.commons.lang.StringUtils;
-
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
@@ -32,7 +30,6 @@ import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
-import com.taobao.tddl.dbsync.binlog.LogPosition;
 import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
@@ -53,7 +50,7 @@ import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
  */
 public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implements MultiStageCoprocessor {
 
-    private static final int             maxFullTimes               = 10;
+    private static final int             maxFullTimes = 10;
     private LogEventConvert              logEventConvert;
     private EventTransactionBuffer       transactionBuffer;
     private ErosaConnection              connection;
@@ -141,14 +138,18 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         super.stop();
     }
 
+    public boolean publish(LogBuffer buffer) {
+        return this.publish(buffer, null);
+    }
+
     /**
      * 网络数据投递
      */
-    public boolean publish(LogBuffer buffer) {
-        return publish(buffer, null);
+    public boolean publish(LogEvent event) {
+        return this.publish(null, event);
     }
 
-    public boolean publish(LogBuffer buffer, String binlogFileName) {
+    private boolean publish(LogBuffer buffer, LogEvent event) {
         if (!isStart()) {
             if (exception != null) {
                 throw exception;
@@ -169,10 +170,11 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         do {
             try {
                 long next = disruptorMsgBuffer.tryNext();
-                MessageEvent event = disruptorMsgBuffer.get(next);
-                event.setBuffer(buffer);
-                if (binlogFileName != null) {
-                    event.setBinlogFileName(binlogFileName);
+                MessageEvent data = disruptorMsgBuffer.get(next);
+                if (buffer != null) {
+                    data.setBuffer(buffer);
+                } else {
+                    data.setEvent(event);
                 }
                 disruptorMsgBuffer.publish(next);
                 if (fullTimes > 0) {
@@ -184,7 +186,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
                     blockingStart = System.nanoTime();
                 }
                 // park
-                //LockSupport.parkNanos(1L);
+                // LockSupport.parkNanos(1L);
                 applyWait(++fullTimes);
                 interupted = Thread.interrupted();
                 if (fullTimes % 1000 == 0) {
@@ -217,7 +219,6 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         start();
     }
 
-
     private class SimpleParserStage implements EventHandler<MessageEvent>, LifecycleAware {
 
         private LogDecoder decoder;
@@ -230,23 +231,13 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throws Exception {
             try {
-                LogBuffer buffer = event.getBuffer();
-                if (StringUtils.isNotEmpty(event.getBinlogFileName())
-                    && (context.getLogPosition() == null || !context.getLogPosition()
-                        .getFileName()
-                        .equals(event.getBinlogFileName()))) {
-                    // set roate binlog file name
-                    if (context.getLogPosition() == null) {
-                        context.setLogPosition(new LogPosition(event.getBinlogFileName(), 0));
-                    } else {
-                        context.setLogPosition(new LogPosition(event.getBinlogFileName(), context.getLogPosition()
-                            .getPosition()));
-                    }
+                LogEvent logEvent = event.getEvent();
+                if (logEvent == null) {
+                    LogBuffer buffer = event.getBuffer();
+                    logEvent = decoder.decode(buffer, context);
+                    event.setEvent(logEvent);
                 }
 
-                LogEvent logEvent = decoder.decode(buffer, context);
-                event.setEvent(logEvent);
-
                 int eventType = logEvent.getHeader().getType();
                 TableMeta tableMeta = null;
                 boolean needDmlParse = false;
@@ -347,7 +338,6 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
                 // clear for gc
                 event.setBuffer(null);
-                event.setBinlogFileName(null);
                 event.setEvent(null);
                 event.setTable(null);
                 event.setEntry(null);
@@ -371,21 +361,12 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
     class MessageEvent {
 
-        private String           binlogFileName;      // for local binlog parse
         private LogBuffer        buffer;
         private CanalEntry.Entry entry;
         private boolean          needDmlParse = false;
         private TableMeta        table;
         private LogEvent         event;
 
-        public String getBinlogFileName() {
-            return binlogFileName;
-        }
-
-        public void setBinlogFileName(String binlogFileName) {
-            this.binlogFileName = binlogFileName;
-        }
-
         public LogBuffer getBuffer() {
             return buffer;
         }

+ 20 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -10,7 +10,6 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 
-import com.taobao.tddl.dbsync.binlog.event.*;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -41,7 +40,23 @@ import com.alibaba.otter.canal.protocol.CanalEntry.Type;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.google.protobuf.ByteString;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
+import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.HeartbeatLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.IntvarLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.LogHeader;
+import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RandLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer;
+import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent.ColumnInfo;
+import com.taobao.tddl.dbsync.binlog.event.UnknownLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.UserVarLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
 
 /**
@@ -872,11 +887,14 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     private TableMeta getTableMeta(String dbName, String tbName, boolean useCache, EntryPosition position) {
         try {
             return tableMetaCache.getTableMeta(dbName, tbName, useCache, position);
-        } catch (Exception e) {
+        } catch (Throwable e) {
             String message = ExceptionUtils.getRootCauseMessage(e);
             if (filterTableError) {
                 if (StringUtils.contains(message, "errorNumber=1146") && StringUtils.contains(message, "doesn't exist")) {
                     return null;
+                } else if (StringUtils.contains(message, "errorNumber=1142")
+                           && StringUtils.contains(message, "command denied")) {
+                    return null;
                 }
             }
 

+ 46 - 16
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.java

@@ -25,13 +25,14 @@ import com.alibaba.otter.canal.parse.exception.CanalParseException;
  */
 public class BinLogFileQueue {
 
-    private String        baseName       = "mysql-bin.";
-    private List<File>    binlogs        = new ArrayList<File>();
-    private File          directory;
-    private ReentrantLock lock           = new ReentrantLock();
-    private Condition     nextCondition  = lock.newCondition();
-    private Timer         timer          = new Timer(true);
-    private long          reloadInterval = 10 * 1000L;           // 10秒
+    private String              baseName       = "mysql-bin.";
+    private List<File>          binlogs        = new ArrayList<File>();
+    private File                directory;
+    private ReentrantLock       lock           = new ReentrantLock();
+    private Condition           nextCondition  = lock.newCondition();
+    private Timer               timer          = new Timer(true);
+    private long                reloadInterval = 10 * 1000L;           // 10秒
+    private CanalParseException exception      = null;
 
     public BinLogFileQueue(String directory){
         this(new File(directory));
@@ -52,9 +53,24 @@ public class BinLogFileQueue {
         timer.scheduleAtFixedRate(new TimerTask() {
 
             public void run() {
-                List<File> files = listBinlogFiles();
-                for (File file : files) {
-                    offer(file);
+                try {
+                    // File errorFile = new File(BinLogFileQueue.this.directory,
+                    // errorFileName);
+                    // if (errorFile.isFile() && errorFile.exists()) {
+                    // String text = StringUtils.join(IOUtils.readLines(new
+                    // FileInputStream(errorFile)), "\n");
+                    // exception = new CanalParseException(text);
+                    // }
+                    List<File> files = listBinlogFiles();
+                    for (File file : files) {
+                        offer(file);
+                    }
+                } catch (Throwable e) {
+                    exception = new CanalParseException(e);
+                }
+
+                if (exception != null) {
+                    offer(null);
                 }
             }
         }, reloadInterval, reloadInterval);
@@ -69,6 +85,10 @@ public class BinLogFileQueue {
     public File getNextFile(File pre) {
         try {
             lock.lockInterruptibly();
+            if (exception != null) {
+                throw exception;
+            }
+
             if (binlogs.size() == 0) {
                 return null;
             } else {
@@ -94,6 +114,10 @@ public class BinLogFileQueue {
     public File getBefore(File file) {
         try {
             lock.lockInterruptibly();
+            if (exception != null) {
+                throw exception;
+            }
+
             if (binlogs.size() == 0) {
                 return null;
             } else {
@@ -130,6 +154,9 @@ public class BinLogFileQueue {
                 nextCondition.await();// 等待新文件
             }
 
+            if (exception != null) {
+                throw exception;
+            }
             if (pre == null) {// 第一次
                 return binlogs.get(0);
             } else {
@@ -170,13 +197,16 @@ public class BinLogFileQueue {
     private boolean offer(File file) {
         try {
             lock.lockInterruptibly();
-            if (!binlogs.contains(file)) {
-                binlogs.add(file);
-                nextCondition.signalAll();// 唤醒
-                return true;
-            } else {
-                return false;
+            if (file != null) {
+                if (!binlogs.contains(file)) {
+                    binlogs.add(file);
+                    nextCondition.signalAll();// 唤醒
+                    return true;
+                }
             }
+
+            nextCondition.signalAll();// 唤醒
+            return false;
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             return false;

+ 87 - 12
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/BinlogDownloadQueue.java

@@ -7,6 +7,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -14,6 +16,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+import javax.net.ssl.SSLContext;
 
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
@@ -23,8 +28,16 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.HttpGet;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.TrustStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +55,7 @@ public class BinlogDownloadQueue {
     private LinkedBlockingQueue<BinlogFile> downloadQueue = new LinkedBlockingQueue<BinlogFile>();
     private LinkedBlockingQueue<Runnable>   taskQueue     = new LinkedBlockingQueue<Runnable>();
     private LinkedList<BinlogFile>          binlogList;
-    private final int                       batchSize;
+    private final int                       batchFileSize;
     private Thread                          downloadThread;
     public boolean                          running       = true;
     private final String                    destDir;
@@ -50,9 +63,9 @@ public class BinlogDownloadQueue {
     private int                             currentSize;
     private String                          lastDownload;
 
-    public BinlogDownloadQueue(List<BinlogFile> downloadQueue, int batchSize, String destDir) throws IOException{
+    public BinlogDownloadQueue(List<BinlogFile> downloadQueue, int batchFileSize, String destDir) throws IOException{
         this.binlogList = new LinkedList(downloadQueue);
-        this.batchSize = batchSize;
+        this.batchFileSize = batchFileSize;
         this.destDir = destDir;
         this.currentSize = 0;
         prepareBinlogList();
@@ -83,11 +96,12 @@ public class BinlogDownloadQueue {
         if (downloadThread != null) {
             return;
         }
-        downloadThread = new Thread(new DownloadThread());
+        downloadThread = new Thread(new DownloadThread(), "download-" + destDir);
+        downloadThread.setDaemon(true);
         downloadThread.start();
     }
 
-    public BinlogFile tryOne() throws IOException {
+    public BinlogFile tryOne() throws Throwable {
         BinlogFile binlogFile = binlogList.poll();
         download(binlogFile);
         hostId = binlogFile.getHostInstanceID();
@@ -121,7 +135,7 @@ public class BinlogDownloadQueue {
     }
 
     public void prepare() throws InterruptedException {
-        for (int i = this.currentSize; i < batchSize && !binlogList.isEmpty(); i++) {
+        for (int i = this.currentSize; i < batchFileSize && !binlogList.isEmpty(); i++) {
             BinlogFile binlogFile = null;
             while (!binlogList.isEmpty()) {
                 binlogFile = binlogList.poll();
@@ -150,11 +164,37 @@ public class BinlogDownloadQueue {
         downloadQueue.clear();
     }
 
-    private void download(BinlogFile binlogFile) throws IOException {
+    private void download(BinlogFile binlogFile) throws Throwable {
         String downloadLink = binlogFile.getDownloadLink();
         String fileName = binlogFile.getFileName();
+
+        downloadLink = downloadLink.trim();
+        CloseableHttpClient httpClient = null;
+        if (downloadLink.startsWith("https")) {
+            HttpClientBuilder builder = HttpClientBuilder.create();
+            builder.setMaxConnPerRoute(50);
+            builder.setMaxConnTotal(100);
+            // 创建支持忽略证书的https
+            final SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
+
+                @Override
+                public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+                    return true;
+                }
+            }).build();
+
+            httpClient = HttpClientBuilder.create()
+                .setSSLContext(sslContext)
+                .setConnectionManager(new PoolingHttpClientConnectionManager(RegistryBuilder.<ConnectionSocketFactory> create()
+                    .register("http", PlainConnectionSocketFactory.INSTANCE)
+                    .register("https", new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE))
+                    .build()))
+                .build();
+        } else {
+            httpClient = HttpClientBuilder.create().setMaxConnPerRoute(50).setMaxConnTotal(100).build();
+        }
+
         HttpGet httpGet = new HttpGet(downloadLink);
-        CloseableHttpClient httpClient = HttpClientBuilder.create().setMaxConnPerRoute(50).setMaxConnTotal(100).build();
         RequestConfig requestConfig = RequestConfig.custom()
             .setConnectTimeout(TIMEOUT)
             .setConnectionRequestTimeout(TIMEOUT)
@@ -187,6 +227,9 @@ public class BinlogDownloadQueue {
                     String name = tarArchiveEntry.getName();
                     File tarFile = new File(parentFile, name + ".tmp");
                     logger.info("start to download file " + tarFile.getName());
+                    if (tarFile.exists()) {
+                        tarFile.delete();
+                    }
                     BufferedOutputStream bos = null;
                     try {
                         bos = new BufferedOutputStream(new FileOutputStream(tarFile));
@@ -204,6 +247,10 @@ public class BinlogDownloadQueue {
                 tais.close();
             } else {
                 File file = new File(parentFile, fileName + ".tmp");
+                if (file.exists()) {
+                    file.delete();
+                }
+
                 if (!file.isFile()) {
                     file.createNewFile();
                 }
@@ -245,17 +292,45 @@ public class BinlogDownloadQueue {
         @Override
         public void run() {
             while (running) {
+                BinlogFile binlogFile = null;
                 try {
-                    BinlogFile binlogFile = downloadQueue.poll(5000, TimeUnit.MILLISECONDS);
+                    binlogFile = downloadQueue.poll(5000, TimeUnit.MILLISECONDS);
                     if (binlogFile != null) {
-                        download(binlogFile);
+                        int retry = 1;
+                        while (true) {
+                            try {
+                                download(binlogFile);
+                                break;
+                            } catch (Throwable e) {
+                                if (retry % 10 == 0) {
+                                    retry = retry + 1;
+                                    try {
+                                        logger.warn("download failed + " + binlogFile.toString() + "], retry : "
+                                                    + retry, e);
+                                        // File errorFile = new File(destDir,
+                                        // "error.txt");
+                                        // FileWriter writer = new
+                                        // FileWriter(errorFile);
+                                        // writer.write(ExceptionUtils.getFullStackTrace(e));
+                                        // writer.flush();
+                                        // IOUtils.closeQuietly(writer);
+                                        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100 * retry));
+                                    } catch (Throwable e1) {
+                                        logger.error("write error failed", e1);
+                                    }
+                                } else {
+                                    retry = retry + 1;
+                                }
+                            }
+                        }
                     }
+
                     Runnable runnable = taskQueue.poll(5000, TimeUnit.MILLISECONDS);
                     if (runnable != null) {
                         runnable.run();
                     }
-                } catch (Exception e) {
-                    e.printStackTrace();
+                } catch (Throwable e) {
+                    logger.error("task process failed", e);
                 }
             }
 

+ 1 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/HttpHelper.java

@@ -155,6 +155,7 @@ public class HttpHelper {
                     .register("https", new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE))
                     .build()))
                 .build();
+
             // ---------------- 创建支持https 的client成功---------
 
             URI uri = new URIBuilder(url).build();

+ 92 - 75
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java

@@ -4,93 +4,109 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
+import org.apache.commons.lang.StringUtils;
+
 import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
 import com.alibaba.otter.canal.parse.inbound.ParserExceptionHandler;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
 
 /**
+ * aliyun rds的binlog parser支持
+ * 
+ * <pre>
+ * 注意点:aliyun的binlog会有定期清理并备份到oss上, 这里实现了一份自动下载oss+rds binlog的机制
+ * </pre>
+ * 
  * @author chengjin.lyf on 2018/7/20 上午10:52
  * @since 1.0.25
  */
 public class RdsBinlogEventParserProxy extends MysqlEventParser {
 
-    private String                    rdsOpenApiUrl        = "https://rds.aliyuncs.com/";    // openapi地址
-    private String                    accesskey;                                             // 云账号的ak
-    private String                    secretkey;                                             // 云账号sk
-    private String                    instanceId;                                            // rds实例id
-    private Long                      startTime;
-    private Long                      endTime;
-    private String                    directory;                                             // binlog
-                                                                                              // 目录
-    private int                       batchSize            = 4;                              // 最多下载的binlog文件数量
-
-    private RdsLocalBinlogEventParser rdsBinlogEventParser = new RdsLocalBinlogEventParser();
-    private ExecutorService           executorService      = Executors.newSingleThreadExecutor(new ThreadFactory() {
-
-                                                               @Override
-                                                               public Thread newThread(Runnable r) {
-                                                                   Thread t = new Thread(r, "rds-binlog-daemon-thread");
-                                                                   t.setDaemon(true);
-                                                                   return t;
-                                                               }
-                                                           });
+    private String                    rdsOpenApiUrl             = "https://rds.aliyuncs.com/";    // openapi地址
+    private String                    accesskey;                                                  // 云账号的ak
+    private String                    secretkey;                                                  // 云账号sk
+    private String                    instanceId;                                                 // rds实例id
+    private String                    directory;                                                  // binlog目录
+    private int                       batchFileSize             = 4;                              // 最多下载的binlog文件数量
+
+    private RdsLocalBinlogEventParser rdsLocalBinlogEventParser = new RdsLocalBinlogEventParser();
+    private ExecutorService           executorService           = Executors.newSingleThreadExecutor(new ThreadFactory() {
+
+                                                                    @Override
+                                                                    public Thread newThread(Runnable r) {
+                                                                        Thread t = new Thread(r,
+                                                                            "rds-binlog-daemon-thread");
+                                                                        t.setDaemon(true);
+                                                                        return t;
+                                                                    }
+                                                                });
 
     @Override
     public void start() {
-        final ParserExceptionHandler targetHandler = this.getParserExceptionHandler();
-        rdsBinlogEventParser.setLogPositionManager(this.getLogPositionManager());
-        rdsBinlogEventParser.setDestination(destination);
-        rdsBinlogEventParser.setAlarmHandler(this.getAlarmHandler());
-        rdsBinlogEventParser.setConnectionCharset(this.connectionCharset);
-        rdsBinlogEventParser.setConnectionCharsetNumber(this.connectionCharsetNumber);
-        rdsBinlogEventParser.setEnableTsdb(this.enableTsdb);
-        rdsBinlogEventParser.setEventBlackFilter(this.eventBlackFilter);
-        rdsBinlogEventParser.setFilterQueryDcl(this.filterQueryDcl);
-        rdsBinlogEventParser.setFilterQueryDdl(this.filterQueryDdl);
-        rdsBinlogEventParser.setFilterQueryDml(this.filterQueryDml);
-        rdsBinlogEventParser.setFilterRows(this.filterRows);
-        rdsBinlogEventParser.setFilterTableError(this.filterTableError);
-        rdsBinlogEventParser.setIsGTIDMode(this.isGTIDMode);
-        rdsBinlogEventParser.setMasterInfo(this.masterInfo);
-        rdsBinlogEventParser.setEventFilter(this.eventFilter);
-        rdsBinlogEventParser.setMasterPosition(this.masterPosition);
-        rdsBinlogEventParser.setTransactionSize(this.transactionSize);
-        rdsBinlogEventParser.setUrl(this.rdsOpenApiUrl);
-        rdsBinlogEventParser.setAccesskey(this.accesskey);
-        rdsBinlogEventParser.setSecretkey(this.secretkey);
-        rdsBinlogEventParser.setInstanceId(this.instanceId);
-        rdsBinlogEventParser.setEventSink(eventSink);
-        rdsBinlogEventParser.setDirectory(directory);
-        rdsBinlogEventParser.setBatchSize(batchSize);
-        rdsBinlogEventParser.setFinishListener(new RdsLocalBinlogEventParser.ParseFinishListener() {
-
-            @Override
-            public void onFinish() {
-                executorService.execute(new Runnable() {
-
-                    @Override
-                    public void run() {
-                        rdsBinlogEventParser.stop();
-                        RdsBinlogEventParserProxy.this.start();
-                    }
-                });
-
+        if (StringUtils.isNotEmpty(accesskey) && StringUtils.isNotEmpty(secretkey)
+            && StringUtils.isNotEmpty(instanceId)) {
+            final ParserExceptionHandler targetHandler = this.getParserExceptionHandler();
+            if (directory == null) {
+                directory = System.getProperty("java.io.tmpdir", "/tmp") + "/" + destination;
             }
-        });
-        this.setParserExceptionHandler(new ParserExceptionHandler() {
-
-            @Override
-            public void handle(Throwable e) {
-                handleMysqlParserException(e);
-                if (targetHandler != null) {
-                    targetHandler.handle(e);
+            rdsLocalBinlogEventParser.setLogPositionManager(this.getLogPositionManager());
+            rdsLocalBinlogEventParser.setDestination(destination);
+            rdsLocalBinlogEventParser.setAlarmHandler(this.getAlarmHandler());
+            rdsLocalBinlogEventParser.setConnectionCharset(this.connectionCharset);
+            rdsLocalBinlogEventParser.setConnectionCharsetNumber(this.connectionCharsetNumber);
+            rdsLocalBinlogEventParser.setEnableTsdb(this.enableTsdb);
+            rdsLocalBinlogEventParser.setEventBlackFilter(this.eventBlackFilter);
+            rdsLocalBinlogEventParser.setFilterQueryDcl(this.filterQueryDcl);
+            rdsLocalBinlogEventParser.setFilterQueryDdl(this.filterQueryDdl);
+            rdsLocalBinlogEventParser.setFilterQueryDml(this.filterQueryDml);
+            rdsLocalBinlogEventParser.setFilterRows(this.filterRows);
+            rdsLocalBinlogEventParser.setFilterTableError(this.filterTableError);
+            // rdsLocalBinlogEventParser.setIsGTIDMode(this.isGTIDMode);
+            rdsLocalBinlogEventParser.setMasterInfo(this.masterInfo);
+            rdsLocalBinlogEventParser.setEventFilter(this.eventFilter);
+            rdsLocalBinlogEventParser.setMasterPosition(this.masterPosition);
+            rdsLocalBinlogEventParser.setTransactionSize(this.transactionSize);
+            rdsLocalBinlogEventParser.setUrl(this.rdsOpenApiUrl);
+            rdsLocalBinlogEventParser.setAccesskey(this.accesskey);
+            rdsLocalBinlogEventParser.setSecretkey(this.secretkey);
+            rdsLocalBinlogEventParser.setInstanceId(this.instanceId);
+            rdsLocalBinlogEventParser.setEventSink(eventSink);
+            rdsLocalBinlogEventParser.setDirectory(directory);
+            rdsLocalBinlogEventParser.setBatchFileSize(batchFileSize);
+            rdsLocalBinlogEventParser.setParallel(this.parallel);
+            rdsLocalBinlogEventParser.setParallelBufferSize(this.parallelBufferSize);
+            rdsLocalBinlogEventParser.setParallelThreadSize(this.parallelThreadSize);
+            rdsLocalBinlogEventParser.setFinishListener(new RdsLocalBinlogEventParser.ParseFinishListener() {
+
+                @Override
+                public void onFinish() {
+                    executorService.execute(new Runnable() {
+
+                        @Override
+                        public void run() {
+                            rdsLocalBinlogEventParser.stop();
+                            RdsBinlogEventParserProxy.this.start();
+                        }
+                    });
+
                 }
-            }
-        });
+            });
+            this.setParserExceptionHandler(new ParserExceptionHandler() {
+
+                @Override
+                public void handle(Throwable e) {
+                    handleMysqlParserException(e);
+                    if (targetHandler != null) {
+                        targetHandler.handle(e);
+                    }
+                }
+            });
+        }
+
         super.start();
     }
 
-    public void handleMysqlParserException(Throwable throwable) {
+    private void handleMysqlParserException(Throwable throwable) {
         if (throwable instanceof PositionNotFoundException) {
             logger.info("remove rds not found position, try download rds binlog!");
             executorService.execute(new Runnable() {
@@ -101,12 +117,12 @@ public class RdsBinlogEventParserProxy extends MysqlEventParser {
                         logger.info("stop mysql parser!");
                         RdsBinlogEventParserProxy rdsBinlogEventParserProxy = RdsBinlogEventParserProxy.this;
                         long serverId = rdsBinlogEventParserProxy.getServerId();
-                        rdsBinlogEventParser.setServerId(serverId);
+                        rdsLocalBinlogEventParser.setServerId(serverId);
                         rdsBinlogEventParserProxy.stop();
                         logger.info("start rds mysql binlog parser!");
-                        rdsBinlogEventParser.start();
-                    } catch (Exception e) {
-                        e.printStackTrace();
+                        rdsLocalBinlogEventParser.start();
+                    } catch (Throwable e) {
+                        logger.info("handle exception failed", e);
                     }
                 }
             });
@@ -143,7 +159,8 @@ public class RdsBinlogEventParserProxy extends MysqlEventParser {
         this.directory = directory;
     }
 
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
+    public void setBatchFileSize(int batchFileSize) {
+        this.batchFileSize = batchFileSize;
     }
+
 }

+ 23 - 307
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogOpenApi.java

@@ -1,51 +1,20 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.rds;
 
-import io.netty.handler.codec.http.HttpResponseStatus;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.URLEncoder;
-import java.text.SimpleDateFormat;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import javax.crypto.Mac;
-import javax.crypto.SecretKey;
-import javax.crypto.spec.SecretKeySpec;
 
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
 import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.BinlogFile;
 import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.DescribeBinlogFileResult;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.RdsBackupPolicy;
 import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.RdsItem;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.request.DescribeBackupPolicyRequest;
 import com.alibaba.otter.canal.parse.inbound.mysql.rds.request.DescribeBinlogFilesRequest;
 
 /**
@@ -54,13 +23,7 @@ import com.alibaba.otter.canal.parse.inbound.mysql.rds.request.DescribeBinlogFil
  */
 public class RdsBinlogOpenApi {
 
-    protected static final Logger logger              = LoggerFactory.getLogger(RdsBinlogOpenApi.class);
-    private static final String   ISO8601_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
-    private static final int      TIMEOUT             = 10000;
-    private static final String   ENCODING            = "UTF-8";
-    private static final String   MAC_NAME            = "HmacSHA1";
-    private static final String   API_VERSION         = "2014-08-15";
-    private static final String   SIGNATURE_VERSION   = "1.0";
+    protected static final Logger logger = LoggerFactory.getLogger(RdsBinlogOpenApi.class);
 
     public static List<BinlogFile> listBinlogFiles(String url, String ak, String sk, String dbInstanceId,
                                                    Date startTime, Date endTime) {
@@ -106,279 +69,32 @@ public class RdsBinlogOpenApi {
         return Collections.EMPTY_LIST;
     }
 
-    public static void downloadBinlogFiles(String url, String ak, String sk, String dbInstanceId, Date startTime,
-                                           Date endTime, File destDir) throws Throwable {
-        int pageSize = 100;
-        int pageNumber = 0;
-        int pageRecordCount = 1;
-        String hostInstanceID = null;
-        while (pageRecordCount > 0 && pageRecordCount <= pageSize) {
-            pageNumber += 1;
-            String result = describeBinlogFiles(url, ak, sk, dbInstanceId, startTime, endTime, pageSize, pageNumber);
-            JSONObject jsobObj = JSON.parseObject(result);
-            pageRecordCount = jsobObj.getInteger("PageRecordCount");
-
-            if (pageRecordCount > 0) {
-                FileUtils.forceMkdir(destDir);
-                File hostIdFile = new File(destDir, "hostId");
-                if (hostIdFile.exists()) {
-                    List<String> lines = IOUtils.readLines(new FileInputStream(hostIdFile));
-                    hostInstanceID = StringUtils.join(lines, "\n");
-                }
-
-                String itemStr = jsobObj.getString("Items");
-                JSONObject binLogFileObj = JSONObject.parseObject(itemStr);
-                JSONArray items = binLogFileObj.getJSONArray("BinLogFile");
-                if (items == null || items.isEmpty()) {
-                    continue;
-                }
-                for (int i = 0; i < items.size(); i++) {
-                    JSONObject item = (JSONObject) items.get(i);
-                    String oneHostInstanceID = item.getString("HostInstanceID");
-                    if (hostInstanceID == null) {
-                        hostInstanceID = oneHostInstanceID;
-                        FileOutputStream hostIdFileOut = null;
-                        try {
-                            hostIdFileOut = new FileOutputStream(hostIdFile);
-                            hostIdFileOut.write(oneHostInstanceID.getBytes());
-                            hostIdFileOut.flush();
-                        } finally {
-                            IOUtils.closeQuietly(hostIdFileOut);
-                        }
-                    }
-
-                    if (hostInstanceID.equals(oneHostInstanceID)) { // 只选择一个host下载
-                        String downloadLink = item.getString("DownloadLink");
-                        String fileName = StringUtils.substringBetween(downloadLink, "mysql-bin.", ".tar");
-                        if (StringUtils.isNotEmpty(fileName)) {
-                            File currentFile = new File(destDir, "mysql-bin." + fileName);
-                            if (currentFile.isFile() && currentFile.exists()) {
-                                // 检查一下文件是否存在,存在就就没必要下载了
-                                continue;
-                            }
-                        }
-
-                        HttpGet httpGet = new HttpGet(downloadLink);
-                        CloseableHttpClient httpClient = HttpClientBuilder.create()
-                            .setMaxConnPerRoute(50)
-                            .setMaxConnTotal(100)
-                            .build();
-                        RequestConfig requestConfig = RequestConfig.custom()
-                            .setConnectTimeout(TIMEOUT)
-                            .setConnectionRequestTimeout(TIMEOUT)
-                            .setSocketTimeout(TIMEOUT)
-                            .build();
-                        httpGet.setConfig(requestConfig);
-                        HttpResponse response = httpClient.execute(httpGet);
-                        int statusCode = response.getStatusLine().getStatusCode();
-                        if (statusCode != HttpResponseStatus.OK.code()) {
-                            throw new RuntimeException("download failed , url:" + downloadLink + " , statusCode:"
-                                                       + statusCode);
-                        }
-                        saveFile(destDir, response);
-                    }
-                }
+    public static RdsBackupPolicy queryBinlogBackupPolicy(String url, String ak, String sk, String dbInstanceId) {
+        DescribeBackupPolicyRequest request = new DescribeBackupPolicyRequest();
+        if (StringUtils.isNotEmpty(url)) {
+            try {
+                URI uri = new URI(url);
+                request.setEndPoint(uri.getHost());
+            } catch (URISyntaxException e) {
+                logger.error("resolve url host failed, will use default rds endpoint!");
             }
         }
-    }
-
-    private static void saveFile(File parentFile, HttpResponse response) throws IOException {
-        InputStream is = response.getEntity().getContent();
-        long totalSize = Long.parseLong(response.getFirstHeader("Content-Length").getValue());
-        String fileName = response.getFirstHeader("Content-Disposition").getValue();
-        fileName = StringUtils.substringAfter(fileName, "filename=");
-        boolean isTar = StringUtils.endsWith(fileName, ".tar");
-        FileUtils.forceMkdir(parentFile);
-        FileOutputStream fos = null;
-        try {
-            if (isTar) {
-                TarArchiveInputStream tais = new TarArchiveInputStream(is);
-                TarArchiveEntry tarArchiveEntry = null;
-                while ((tarArchiveEntry = tais.getNextTarEntry()) != null) {
-                    String name = tarArchiveEntry.getName();
-                    File tarFile = new File(parentFile, name);
-                    logger.info("start to download file " + tarFile.getName());
-                    BufferedOutputStream bos = null;
-                    try {
-                        bos = new BufferedOutputStream(new FileOutputStream(tarFile));
-                        int read = -1;
-                        byte[] buffer = new byte[1024];
-                        while ((read = tais.read(buffer)) != -1) {
-                            bos.write(buffer, 0, read);
-                        }
-                        logger.info("download file " + tarFile.getName() + " end!");
-                    } finally {
-                        IOUtils.closeQuietly(bos);
-                    }
-                }
-                tais.close();
-            } else {
-                File file = new File(parentFile, fileName);
-                if (!file.isFile()) {
-                    file.createNewFile();
+        request.setRdsInstanceId(dbInstanceId);
+        request.setAccessKeyId(ak);
+        request.setAccessKeySecret(sk);
+        int retryTime = 3;
+        while (true) {
+            try {
+                return request.doAction();
+            } catch (Exception e) {
+                if (retryTime-- <= 0) {
+                    throw new RuntimeException(e);
                 }
                 try {
-                    fos = new FileOutputStream(file);
-                    byte[] buffer = new byte[1024];
-                    int len;
-                    long copySize = 0;
-                    long nextPrintProgress = 0;
-                    logger.info("start to download file " + file.getName());
-                    while ((len = is.read(buffer)) != -1) {
-                        fos.write(buffer, 0, len);
-                        copySize += len;
-                        long progress = copySize * 100 / totalSize;
-                        if (progress >= nextPrintProgress) {
-                            logger.info("download " + file.getName() + " progress : " + progress
-                                        + "% , download size : " + copySize + ", total size : " + totalSize);
-                            nextPrintProgress += 10;
-                        }
-                    }
-                    logger.info("download file " + file.getName() + " end!");
-                    fos.flush();
-                } finally {
-                    IOUtils.closeQuietly(fos);
+                    Thread.sleep(100L);
+                } catch (InterruptedException e1) {
                 }
             }
-        } finally {
-            IOUtils.closeQuietly(fos);
-        }
-    }
-
-    public static String describeBinlogFiles(String url, String ak, String sk, String dbInstanceId, Date startTime,
-                                             Date endTime, int pageSize, int pageNumber) throws Exception {
-        Map<String, String> paramMap = new HashMap<String, String>();
-        paramMap.put("Action", "DescribeBinlogFiles");
-        paramMap.put("DBInstanceId", dbInstanceId); // rds实例id
-        paramMap.put("StartTime", formatIso8601Date(startTime));
-        paramMap.put("EndTime", formatIso8601Date(endTime));
-        paramMap.put("PageSize", String.valueOf(pageSize));
-        paramMap.put("PageNumber", String.valueOf(pageNumber));
-        return doRequest(url, paramMap, ak, sk);
-    }
-
-    private static String doRequest(String domin, Map<String, String> param, String ak, String sk) throws Exception {
-        param.put("AccessKeyId", ak);
-        param.put("SignatureMethod", "HMAC-SHA1");
-        param.put("SignatureVersion", SIGNATURE_VERSION);
-        param.put("Version", API_VERSION);
-        param.put("SignatureNonce", UUID.randomUUID().toString());
-        param.put("Format", "JSON");
-        param.put("Timestamp", formatIso8601Date(new Date()));
-        String signStr = generate("POST", param, sk);
-        param.put("Signature", signStr);
-        String request = concatQueryString(param);
-        String url = domin + "?" + request;
-        String result = HttpHelper.post(url, null, Collections.EMPTY_MAP, TIMEOUT);
-        return result;
-    }
-
-    public static String concatQueryString(Map<String, String> parameters) throws UnsupportedEncodingException {
-        if (null == parameters) {
-            return null;
-        }
-        StringBuilder urlBuilder = new StringBuilder("");
-        for (Map.Entry<String, String> entry : parameters.entrySet()) {
-            String key = entry.getKey();
-            String val = entry.getValue();
-            urlBuilder.append(encode(key));
-            if (val != null) {
-                urlBuilder.append("=").append(encode(val));
-            }
-            urlBuilder.append("&");
-        }
-        int strIndex = urlBuilder.length();
-        if (parameters.size() > 0) {
-            urlBuilder.deleteCharAt(strIndex - 1);
-        }
-        return urlBuilder.toString();
-    }
-
-    public static String encode(String value) throws UnsupportedEncodingException {
-        return URLEncoder.encode(value, "UTF-8");
-    }
-
-    private static String formatIso8601Date(Date date) {
-        SimpleDateFormat df = new SimpleDateFormat(ISO8601_DATE_FORMAT);
-        df.setTimeZone(TimeZone.getTimeZone("GMT"));
-        return df.format(date);
-    }
-
-    /**
-     * 使用 HMAC-SHA1 签名方法对对encryptText进行签名
-     *
-     * @param encryptText 被签名的字符串
-     * @param encryptKey 密钥
-     * @return
-     * @throws Exception
-     */
-    public static byte[] HmacSHA1Encrypt(String encryptText, String encryptKey) throws Exception {
-        byte[] data = encryptKey.getBytes(ENCODING);
-        // 根据给定的字节数组构造一个密钥,第二参数指定一个密钥算法的名称
-        SecretKey secretKey = new SecretKeySpec(data, MAC_NAME);
-        // 生成一个指定 Mac 算法 的 Mac 对象
-        Mac mac = Mac.getInstance(MAC_NAME);
-        // 用给定密钥初始化 Mac 对象
-        mac.init(secretKey);
-
-        byte[] text = encryptText.getBytes(ENCODING);
-        // 完成 Mac 操作
-        return mac.doFinal(text);
-    }
-
-    private static String base64(byte input[]) throws UnsupportedEncodingException {
-        return new String(Base64.encodeBase64(input), ENCODING);
-    }
-
-    /** 对参数名称和参数值进行URL编码 **/
-    public static String generate(String method, Map<String, String> parameter, String accessKeySecret)
-                                                                                                       throws Exception {
-        String signString = generateSignString(method, parameter);
-        byte[] signBytes = HmacSHA1Encrypt(signString, accessKeySecret + "&");
-        String signature = base64(signBytes);
-        if ("POST".equals(method)) {
-            return signature;
-        }
-        return URLEncoder.encode(signature, "UTF-8");
-    }
-
-    private static String generateQueryString(TreeMap<String, String> treeMap) {
-        StringBuilder canonicalizedQueryString = new StringBuilder();
-        boolean first = true;
-        for (String key : treeMap.navigableKeySet()) {
-            String value = treeMap.get(key);
-            if (!first) {
-                canonicalizedQueryString.append("&");
-            }
-            first = false;
-            canonicalizedQueryString.append(percentEncode(key)).append("=").append(percentEncode(value));
-        }
-        return canonicalizedQueryString.toString();
-    }
-
-    public static String generateSignString(String httpMethod, Map<String, String> parameter) throws IOException {
-        TreeMap<String, String> sortParameter = new TreeMap<String, String>();
-        sortParameter.putAll(parameter);
-        String canonicalizedQueryString = generateQueryString(sortParameter);
-        if (null == httpMethod) {
-            throw new RuntimeException("httpMethod can not be empty");
-        }
-        /** 构造待签名的字符串* */
-        StringBuilder stringToSign = new StringBuilder();
-        stringToSign.append(httpMethod).append("&");
-        stringToSign.append(percentEncode("/")).append("&");
-        stringToSign.append(percentEncode(canonicalizedQueryString));
-        return stringToSign.toString();
-    }
-
-    public static String percentEncode(String value) {
-        try {
-            return value == null ? null : URLEncoder.encode(value, ENCODING)
-                .replaceAll("\\+", "%20")
-                .replaceAll("\\*", "%2A")
-                .replaceAll("%7E", "~");
-        } catch (Exception e) {
         }
-        return "";
     }
 }

+ 20 - 13
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsLocalBinlogEventParser.java

@@ -28,15 +28,15 @@ import com.alibaba.otter.canal.protocol.position.LogPosition;
  */
 public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements CanalEventParser, LocalBinLogConnection.FileParserListener {
 
-    private String              url = "https://rds.aliyuncs.com/"; // openapi地址
-    private String              accesskey;                        // 云账号的ak
-    private String              secretkey;                        // 云账号sk
-    private String              instanceId;                       // rds实例id
+    private String              url;                // openapi地址
+    private String              accesskey;          // 云账号的ak
+    private String              secretkey;          // 云账号sk
+    private String              instanceId;         // rds实例id
     private Long                startTime;
     private Long                endTime;
     private BinlogDownloadQueue binlogDownloadQueue;
     private ParseFinishListener finishListener;
-    private int                 batchSize;
+    private int                 batchFileSize;
 
     public RdsLocalBinlogEventParser(){
     }
@@ -65,10 +65,9 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
                 instanceId,
                 new Date(startTime),
                 new Date(endTime));
-            binlogDownloadQueue = new BinlogDownloadQueue(binlogFiles, batchSize, directory);
+            binlogDownloadQueue = new BinlogDownloadQueue(binlogFiles, batchFileSize, directory);
             binlogDownloadQueue.silenceDownload();
             needWait = true;
-            parallel = false;
             // try to download one file,use to test server id
             binlogDownloadQueue.tryOne();
         } catch (Throwable e) {
@@ -91,10 +90,12 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
             binlogDownloadQueue.notifyNotMatch();
             try {
                 binlogDownloadQueue.cleanDir();
+                binlogDownloadQueue.tryOne();
                 binlogDownloadQueue.prepare();
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 throw new RuntimeException(e);
             }
+
             try {
                 binlogDownloadQueue.execute(new Runnable() {
 
@@ -163,8 +164,10 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
             }
             // 处理下logManager位点问题
             LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
-            EntryPosition position = logPosition.getPostion();
-            if (position != null) {
+            Long timestamp = 0L;
+            if (logPosition != null && logPosition.getPostion() != null) {
+                timestamp = logPosition.getPostion().getTimestamp();
+                EntryPosition position = logPosition.getPostion();
                 LogPosition newLogPosition = new LogPosition();
                 String journalName = position.getJournalName();
                 int sepIdx = journalName.indexOf(".");
@@ -181,9 +184,13 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
             }
 
             if (binlogDownloadQueue.isLastFile(fileName)) {
-                logger.info("all file parse complete, switch to mysql parser!");
+                logger.warn("last file : " + fileName + " , timestamp : " + timestamp
+                            + " , all file parse complete, switch to mysql parser!");
                 finishListener.onFinish();
                 return;
+            } else {
+                logger.warn("parse local binlog file : " + fileName + " , timestamp : " + timestamp
+                            + " , try the next binlog !");
             }
             binlogDownloadQueue.prepare();
         } catch (Exception e) {
@@ -207,7 +214,7 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
         void onFinish();
     }
 
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
+    public void setBatchFileSize(int batchFileSize) {
+        this.batchFileSize = batchFileSize;
     }
 }

+ 9 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/BinlogFile.java

@@ -6,7 +6,7 @@ package com.alibaba.otter.canal.parse.inbound.mysql.rds.data;
  */
 public class BinlogFile {
 
-    private Long FileSize;
+    private Long   FileSize;
     private String LogBeginTime;
     private String LogEndTime;
     private String DownloadLink;
@@ -69,4 +69,12 @@ public class BinlogFile {
     public void setFileName(String fileName) {
         this.fileName = fileName;
     }
+
+    @Override
+    public String toString() {
+        return "BinlogFile [FileSize=" + FileSize + ", LogBeginTime=" + LogBeginTime + ", LogEndTime=" + LogEndTime
+               + ", DownloadLink=" + DownloadLink + ", HostInstanceID=" + HostInstanceID + ", LinkExpiredTime="
+               + LinkExpiredTime + ", fileName=" + fileName + "]";
+    }
+
 }

+ 6 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/DescribeBinlogFileResult.java

@@ -5,12 +5,13 @@ package com.alibaba.otter.canal.parse.inbound.mysql.rds.data;
  * @since 1.0.25
  */
 public class DescribeBinlogFileResult {
+
     private RdsItem Items;
-    private long PageNumber;
-    private long TotalRecordCount;
-    private long TotalFileSize;
-    private String RequestId;
-    private long PageRecordCount;
+    private long    PageNumber;
+    private long    TotalRecordCount;
+    private long    TotalFileSize;
+    private String  RequestId;
+    private long    PageRecordCount;
 
     public RdsItem getItems() {
         return Items;

+ 14 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/RdsBackupPolicy.java

@@ -9,15 +9,16 @@ public class RdsBackupPolicy {
     /**
      * 数据备份保留天数(7到730天)。
      */
-    private String BackupRetentionPeriod;
+    private String  BackupRetentionPeriod;
     /**
      * 数据备份时间,格式:HH:mmZ- HH:mm Z。
      */
-    private String PreferredBackupTime;
+    private String  PreferredBackupTime;
     /**
-     * 数据备份周期。Monday:周一;Tuesday:周二;Wednesday:周三;Thursday:周四;Friday:周五;Saturday:周六;Sunday:周日。
+     * 数据备份周期。Monday:周一;Tuesday:周二;Wednesday:周三;Thursday:周四;Friday:周五;Saturday:
+     * 周六;Sunday:周日。
      */
-    private String PreferredBackupPeriod;
+    private String  PreferredBackupPeriod;
     /**
      * 日志备份状态。Enable:开启;Disabled:关闭。
      */
@@ -25,7 +26,7 @@ public class RdsBackupPolicy {
     /**
      * 日志备份保留天数(7到730天)。
      */
-    private int LogBackupRetentionPeriod;
+    private int     LogBackupRetentionPeriod;
 
     public String getBackupRetentionPeriod() {
         return BackupRetentionPeriod;
@@ -66,4 +67,12 @@ public class RdsBackupPolicy {
     public void setLogBackupRetentionPeriod(int logBackupRetentionPeriod) {
         LogBackupRetentionPeriod = logBackupRetentionPeriod;
     }
+
+    @Override
+    public String toString() {
+        return "RdsBackupPolicy [BackupRetentionPeriod=" + BackupRetentionPeriod + ", PreferredBackupTime="
+               + PreferredBackupTime + ", PreferredBackupPeriod=" + PreferredBackupPeriod + ", BackupLog=" + BackupLog
+               + ", LogBackupRetentionPeriod=" + LogBackupRetentionPeriod + "]";
+    }
+
 }

+ 7 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/RdsItem.java

@@ -7,6 +7,7 @@ import java.util.List;
  * @since 1.0.25
  */
 public class RdsItem {
+
     private List<BinlogFile> BinLogFile;
 
     public List<BinlogFile> getBinLogFile() {
@@ -16,4 +17,10 @@ public class RdsItem {
     public void setBinLogFile(List<BinlogFile> binLogFile) {
         BinLogFile = binLogFile;
     }
+
+    @Override
+    public String toString() {
+        return "RdsItem [BinLogFile=" + BinLogFile + "]";
+    }
+
 }

+ 35 - 34
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/AbstractRequest.java

@@ -1,12 +1,18 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.rds.request;
 
+import io.netty.handler.codec.http.HttpResponseStatus;
+
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.Date;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import javax.crypto.Mac;
@@ -32,8 +38,6 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.ssl.SSLContexts;
 import org.apache.http.util.EntityUtils;
 
-import io.netty.handler.codec.http.HttpResponseStatus;
-
 /**
  * @author chengjin.lyf on 2018/8/7 下午2:26
  * @since 1.0.25
@@ -49,42 +53,38 @@ public abstract class AbstractRequest<T> {
      */
     private static final String MAC_NAME = "HmacSHA1";
 
-    private String accessKeyId;
+    private String              accessKeyId;
 
-    private String accessKeySecret;
+    private String              accessKeySecret;
 
     /**
-     *  api 版本
-     *
+     * api 版本
      */
-    private String version;
+    private String              version;
 
-    private String endPoint = "rds.aliyuncs.com";
+    private String              endPoint = "rds.aliyuncs.com";
 
-    private String protocol = "http";
+    private String              protocol = "http";
 
     public void setProtocol(String protocol) {
         this.protocol = protocol;
     }
 
-    private int timeout = (int) TimeUnit.MINUTES.toMillis(1);
-
+    private int                 timeout = (int) TimeUnit.MINUTES.toMillis(1);
 
     private Map<String, String> treeMap = new TreeMap();
 
-    public void putQueryString(String name, String value){
-        if (StringUtils.isBlank(name) || StringUtils.isBlank(value)){
+    public void putQueryString(String name, String value) {
+        if (StringUtils.isBlank(name) || StringUtils.isBlank(value)) {
             return;
         }
         treeMap.put(name, value);
     }
 
-
     public void setVersion(String version) {
         this.version = version;
     }
 
-
     public void setEndPoint(String endPoint) {
         this.endPoint = endPoint;
     }
@@ -169,7 +169,7 @@ public abstract class AbstractRequest<T> {
         p.put("Format", "JSON");
         p.put("Version", version);
         p.put("AccessKeyId", accessKeyId);
-        p.put("SignatureMethod", "HMAC-SHA1"); //此处不能用变量 MAC_NAME
+        p.put("SignatureMethod", "HMAC-SHA1"); // 此处不能用变量 MAC_NAME
         p.put("Timestamp", formatUTCTZ(new Date()));
         p.put("SignatureVersion", "1.0");
         p.put("SignatureNonce", UUID.randomUUID().toString());
@@ -193,6 +193,7 @@ public abstract class AbstractRequest<T> {
      * @return
      * @throws IOException
      */
+    @SuppressWarnings("deprecation")
     private final HttpResponse executeHttpRequest(HttpGet getMethod, String host) throws Exception {
         SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(null, new TrustStrategy() {
 
@@ -202,24 +203,24 @@ public abstract class AbstractRequest<T> {
             }
         }).build();
         SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext,
-                new String[] { "TLSv1" },
-                null,
-                SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+            new String[] { "TLSv1" },
+            null,
+            SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
         Registry registry = RegistryBuilder.create()
-                .register("http", PlainConnectionSocketFactory.INSTANCE)
-                .register("https", sslsf)
-                .build();
+            .register("http", PlainConnectionSocketFactory.INSTANCE)
+            .register("https", sslsf)
+            .build();
         HttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(registry);
         CloseableHttpClient httpClient = HttpClientBuilder.create()
-                .setMaxConnPerRoute(50)
-                .setMaxConnTotal(100)
-                .setConnectionManager(httpClientConnectionManager)
-                .build();
+            .setMaxConnPerRoute(50)
+            .setMaxConnTotal(100)
+            .setConnectionManager(httpClientConnectionManager)
+            .build();
         RequestConfig requestConfig = RequestConfig.custom()
-                .setConnectTimeout(timeout)
-                .setConnectionRequestTimeout(timeout)
-                .setSocketTimeout(timeout)
-                .build();
+            .setConnectTimeout(timeout)
+            .setConnectionRequestTimeout(timeout)
+            .setSocketTimeout(timeout)
+            .build();
         getMethod.setConfig(requestConfig);
         HttpResponse response = httpClient.execute(getMethod);
         int statusCode = response.getStatusLine().getStatusCode();
@@ -232,14 +233,14 @@ public abstract class AbstractRequest<T> {
 
     protected abstract T processResult(HttpResponse response) throws Exception;
 
-    protected void processBefore(){
+    protected void processBefore() {
 
     }
 
-    public final T  doAction() throws Exception {
+    public final T doAction() throws Exception {
         processBefore();
         String requestStr = makeRequestString(treeMap);
-        HttpGet httpGet = new HttpGet(protocol + "://" +endPoint + "?" + requestStr);
+        HttpGet httpGet = new HttpGet(protocol + "://" + endPoint + "?" + requestStr);
         HttpResponse response = executeHttpRequest(httpGet, endPoint);
         if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
             String result = EntityUtils.toString(response.getEntity());

+ 3 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/DescribeBackupPolicyRequest.java

@@ -1,27 +1,26 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.rds.request;
 
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.RdsBackupPolicy;
 import org.apache.http.HttpResponse;
 import org.apache.http.util.EntityUtils;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.RdsBackupPolicy;
 
 /**
  * rds 备份策略查询
+ * 
  * @author chengjin.lyf on 2018/8/7 下午3:41
  * @since 1.0.25
  */
 public class DescribeBackupPolicyRequest extends AbstractRequest<RdsBackupPolicy> {
 
-
-    public DescribeBackupPolicyRequest() {
+    public DescribeBackupPolicyRequest(){
         setVersion("2014-08-15");
         putQueryString("Action", "DescribeBackupPolicy");
 
     }
 
-
     public void setRdsInstanceId(String rdsInstanceId) {
         putQueryString("DBInstanceId", rdsInstanceId);
     }

+ 7 - 8
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/DescribeBinlogFilesRequest.java

@@ -2,12 +2,12 @@ package com.alibaba.otter.canal.parse.inbound.mysql.rds.request;
 
 import java.util.Date;
 
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.DescribeBinlogFileResult;
 import org.apache.http.HttpResponse;
 import org.apache.http.util.EntityUtils;
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.DescribeBinlogFileResult;
 
 /**
  * @author chengjin.lyf on 2018/8/7 下午3:41
@@ -15,11 +15,9 @@ import com.alibaba.fastjson.TypeReference;
  */
 public class DescribeBinlogFilesRequest extends AbstractRequest<DescribeBinlogFileResult> {
 
-
-    public DescribeBinlogFilesRequest() {
+    public DescribeBinlogFilesRequest(){
         setVersion("2014-08-15");
         putQueryString("Action", "DescribeBinlogFiles");
-
     }
 
     public void setRdsInstanceId(String rdsInstanceId) {
@@ -35,11 +33,11 @@ public class DescribeBinlogFilesRequest extends AbstractRequest<DescribeBinlogFi
     }
 
     public void setStartDate(Date startDate) {
-        putQueryString("StartTime" , formatUTCTZ(startDate));
+        putQueryString("StartTime", formatUTCTZ(startDate));
     }
 
     public void setEndDate(Date endDate) {
-        putQueryString("EndTime" , formatUTCTZ(endDate));
+        putQueryString("EndTime", formatUTCTZ(endDate));
     }
 
     public void setResourceOwnerId(Long resourceOwnerId) {
@@ -49,8 +47,9 @@ public class DescribeBinlogFilesRequest extends AbstractRequest<DescribeBinlogFi
     @Override
     protected DescribeBinlogFileResult processResult(HttpResponse response) throws Exception {
         String result = EntityUtils.toString(response.getEntity());
-        DescribeBinlogFileResult describeBinlogFileResult = JSONObject.parseObject(result, new TypeReference<DescribeBinlogFileResult>() {
-        });
+        DescribeBinlogFileResult describeBinlogFileResult = JSONObject.parseObject(result,
+            new TypeReference<DescribeBinlogFileResult>() {
+            });
         return describeBinlogFileResult;
     }
 }

+ 2 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java

@@ -68,7 +68,8 @@ public class MemoryTableMeta implements TableMetaTSDB {
 
             try {
                 // druid暂时flush privileges语法解析有问题
-                if (!StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "flush")) {
+                if (!StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "flush")
+                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "grant")) {
                     repository.console(ddl);
                 }
             } catch (Throwable e) {

+ 11 - 10
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsBinlogEventParserProxyTest.java

@@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
 import com.alibaba.otter.canal.parse.helper.TimeoutChecker;
 import com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogEventParserProxy;
 import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
@@ -28,10 +29,9 @@ public class RdsBinlogEventParserProxyTest {
     private static final String MYSQL_ADDRESS = "";
     private static final String USERNAME      = "";
     private static final String PASSWORD      = "";
-    public static final String DBNAME = "";
-    public static final String TBNAME = "";
-    public static final String DDL = "";
-
+    public static final String  DBNAME        = "";
+    public static final String  TBNAME        = "";
+    public static final String  DDL           = "";
 
     @Test
     public void test_timestamp() throws InterruptedException {
@@ -41,7 +41,7 @@ public class RdsBinlogEventParserProxyTest {
 
         final RdsBinlogEventParserProxy controller = new RdsBinlogEventParserProxy();
         Calendar calendar = Calendar.getInstance();
-        calendar.add(Calendar.DAY_OF_YEAR, -1);
+        calendar.add(Calendar.HOUR_OF_DAY, -24 * 4);
         final EntryPosition defaultPosition = buildPosition(null, null, calendar.getTimeInMillis());
         controller.setSlaveId(3344L);
         controller.setDetectingEnable(false);
@@ -51,13 +51,15 @@ public class RdsBinlogEventParserProxyTest {
         controller.setInstanceId("");
         controller.setAccesskey("");
         controller.setSecretkey("");
-        controller.setBatchSize(4);
-//        controller.setRdsOpenApiUrl("https://rds.aliyuncs.com/");
+        controller.setDirectory("/tmp/binlog");
+        controller.setEventBlackFilter(new AviaterRegexFilter("mysql\\.*"));
+        controller.setFilterTableError(true);
+        controller.setBatchFileSize(4);
         controller.setEventSink(new AbstractCanalEventSinkTest<List<CanalEntry.Entry>>() {
 
             @Override
             public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination)
-                    throws CanalSinkException {
+                                                                                                                   throws CanalSinkException {
                 for (CanalEntry.Entry entry : entrys) {
                     if (entry.getEntryType() != CanalEntry.EntryType.HEARTBEAT) {
                         entryCount.incrementAndGet();
@@ -79,8 +81,8 @@ public class RdsBinlogEventParserProxyTest {
         controller.setLogPositionManager(new AbstractLogPositionManager() {
 
             private LogPosition logPosition;
+
             public void persistLogPosition(String destination, LogPosition logPosition) {
-                System.out.println(logPosition);
                 this.logPosition = logPosition;
             }
 
@@ -105,7 +107,6 @@ public class RdsBinlogEventParserProxyTest {
         Assert.assertTrue(entryPosition.getTimestamp() <= defaultPosition.getTimestamp());
     }
 
-
     // ======================== helper method =======================
 
     private EntryPosition buildPosition(String binlogFile, Long offest, Long timestamp) {

+ 17 - 10
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsBinlogOpenApiTest.java

@@ -1,11 +1,14 @@
 package com.alibaba.otter.canal.parse.inbound.mysql;
 
-import java.io.File;
 import java.util.Date;
+import java.util.List;
 
 import org.apache.commons.lang.time.DateUtils;
+import org.junit.Test;
 
 import com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogOpenApi;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.BinlogFile;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.RdsBackupPolicy;
 
 /**
  * @author agapple 2017年10月15日 下午2:14:34
@@ -13,15 +16,19 @@ import com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogOpenApi;
  */
 public class RdsBinlogOpenApiTest {
 
+    @Test
     public void testSimple() throws Throwable {
-        Date startTime = DateUtils.parseDate("2017-10-13 20:56:58", new String[] { "yyyy-MM-dd HH:mm:ss" });
-        Date endTime = DateUtils.parseDate("2017-10-14 02:57:59", new String[] { "yyyy-MM-dd HH:mm:ss" });
-        RdsBinlogOpenApi.downloadBinlogFiles("https://rds.aliyuncs.com/",
-            "",
-            "",
-            "rm-bp180v4mfjnm157es",
-            startTime,
-            endTime,
-            new File("/tmp/binlog/"));
+        Date startTime = DateUtils.parseDate("2018-08-10 12:00:00", new String[] { "yyyy-MM-dd HH:mm:ss" });
+        Date endTime = DateUtils.parseDate("2018-08-11 12:00:00", new String[] { "yyyy-MM-dd HH:mm:ss" });
+        String url = "https://rds.aliyuncs.com/";
+        String ak = "";
+        String sk = "";
+        String dbInstanceId = "";
+
+        RdsBackupPolicy backupPolicy = RdsBinlogOpenApi.queryBinlogBackupPolicy(url, ak, sk, dbInstanceId);
+        System.out.println(backupPolicy);
+
+        List<BinlogFile> binlogFiles = RdsBinlogOpenApi.listBinlogFiles(url, ak, sk, dbInstanceId, startTime, endTime);
+        System.out.println(binlogFiles);
     }
 }