Parcourir la source

Merge branch 'master' into master

agapple il y a 8 ans
Parent
commit
36216e63ea
42 fichiers modifiés avec 1446 ajouts et 502 suppressions
  1. 1 1
      client/pom.xml
  2. 1 1
      common/pom.xml
  3. 30 0
      common/src/main/java/com/alibaba/otter/canal/common/utils/JsonUtils.java
  4. 4 0
      common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java
  5. 1 1
      dbsync/pom.xml
  6. 7 2
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/FormatDescriptionLogEvent.java
  7. 1 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RotateLogEvent.java
  8. 1 2
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java
  9. 1 1
      deployer/pom.xml
  10. 10 4
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java
  11. 1 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/SpringInstanceConfigMonitor.java
  12. 163 0
      deployer/src/main/resources/spring/local-instance.xml
  13. 1 1
      driver/pom.xml
  14. 2 1
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java
  15. 1 1
      example/pom.xml
  16. 1 1
      filter/pom.xml
  17. 1 1
      instance/core/pom.xml
  18. 1 1
      instance/manager/pom.xml
  19. 1 1
      instance/pom.xml
  20. 1 1
      instance/spring/pom.xml
  21. 1 1
      meta/pom.xml
  22. 1 1
      meta/src/main/java/com/alibaba/otter/canal/meta/PeriodMixedMetaManager.java
  23. 1 1
      parse/pom.xml
  24. 5 3
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  25. 12 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/TableMeta.java
  26. 19 13
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java
  27. 66 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java
  28. 37 3
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  29. 8 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  30. 54 22
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  31. 17 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
  32. 5 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.java
  33. 73 56
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParserTest.java
  34. 2 2
      pom.xml
  35. 1 1
      protocol/pom.xml
  36. 477 177
      protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalEntry.java
  37. 431 188
      protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalPacket.java
  38. 1 1
      server/pom.xml
  39. 2 0
      server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java
  40. 1 1
      server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java
  41. 1 1
      sink/pom.xml
  42. 1 1
      store/pom.xml

+ 1 - 1
client/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
common/pom.xml

@@ -4,7 +4,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<artifactId>canal.common</artifactId>

+ 30 - 0
common/src/main/java/com/alibaba/otter/canal/common/utils/JsonUtils.java

@@ -1,12 +1,19 @@
 package com.alibaba.otter.canal.common.utils;
 
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
+import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.List;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.TypeReference;
 import com.alibaba.fastjson.serializer.JSONSerializer;
+import com.alibaba.fastjson.serializer.ObjectSerializer;
 import com.alibaba.fastjson.serializer.PropertyFilter;
+import com.alibaba.fastjson.serializer.SerializeConfig;
 import com.alibaba.fastjson.serializer.SerializeWriter;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 
@@ -17,6 +24,12 @@ import com.alibaba.fastjson.serializer.SerializerFeature;
  */
 public class JsonUtils {
 
+    static {
+        SerializeConfig.getGlobalInstance().put(InetAddress.class, InetAddressSerializer.instance);
+        SerializeConfig.getGlobalInstance().put(Inet4Address.class, InetAddressSerializer.instance);
+        SerializeConfig.getGlobalInstance().put(Inet6Address.class, InetAddressSerializer.instance);
+    }
+
     public static <T> T unmarshalFromByte(byte[] bytes, Class<T> targetClass) {
         return (T) JSON.parseObject(bytes, targetClass);// 默认为UTF-8
     }
@@ -70,4 +83,21 @@ public class JsonUtils {
             out.close();
         }
     }
+
+    public static class InetAddressSerializer implements ObjectSerializer {
+
+        public static InetAddressSerializer instance = new InetAddressSerializer();
+
+        public void write(JSONSerializer serializer, Object object, Object fieldName, Type fieldType)
+                                                                                                     throws IOException {
+            if (object == null) {
+                serializer.writeNull();
+                return;
+            }
+
+            InetAddress address = (InetAddress) object;
+            // 优先使用name
+            serializer.write(address.getHostName());
+        }
+    }
 }

+ 4 - 0
common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java

@@ -87,6 +87,10 @@ public class ServerRunningMonitor extends AbstractCanalLifeCycle {
 
     }
 
+    public void init() {
+        processStart();
+    }
+
     public void start() {
         super.start();
         processStart();

+ 1 - 1
dbsync/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>

+ 7 - 2
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/FormatDescriptionLogEvent.java

@@ -140,10 +140,15 @@ public final class FormatDescriptionLogEvent extends StartLogEventV3 {
                 throw new IOException("Unknown binlog version: " + binlogVersion);
         }
     }
-
+    
+    public FormatDescriptionLogEvent(final int binlogVersion,int binlogChecksum){
+    	this(binlogVersion);
+    	this.header.checksumAlg = binlogChecksum;
+    }
+    
     public FormatDescriptionLogEvent(final int binlogVersion){
         this.binlogVersion = binlogVersion;
-
+        
         postHeaderLen = new short[ENUM_END_EVENT];
         /* identify binlog format */
         switch (binlogVersion) {

+ 1 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RotateLogEvent.java

@@ -90,6 +90,7 @@ public final class RotateLogEvent extends LogEvent {
         int filenameLen = buffer.limit() - filenameOffset;
         if (filenameLen > FN_REFLEN - 1) filenameLen = FN_REFLEN - 1;
         buffer.position(filenameOffset);
+        
         filename = buffer.getFixString(filenameLen);
     }
 

+ 1 - 2
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java

@@ -957,11 +957,10 @@ public final class RowsLogBuffer {
                 jsonValue.toJsonString(builder);
                 value = builder.toString();
                 buffer.position(position + len);
-
                 // byte[] binary = new byte[len];
                 // buffer.fillBytes(binary, 0, len);
                 // value = binary;
-                javaType = Types.VARBINARY;
+                javaType = Types.VARCHAR;
                 length = len;
                 break;
             }

+ 1 - 1
deployer/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>

+ 10 - 4
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -5,6 +5,7 @@ import java.util.Properties;
 
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -172,6 +173,8 @@ public class CanalController {
                 if (zkclientx != null) {
                     runningMonitor.setZkClient(zkclientx);
                 }
+                // 触发创建一下cid节点
+                runningMonitor.init();
                 return runningMonitor;
             }
         }));
@@ -189,10 +192,10 @@ public class CanalController {
                         instanceConfigs.put(destination, config);
                     }
 
-                    if (!config.getLazy() && !embededCanalServer.isStart(destination)) {
+                    if (!embededCanalServer.isStart(destination)) {
                         // HA机制启动
                         ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
-                        if (!runningMonitor.isStart()) {
+                        if (!config.getLazy() && !runningMonitor.isStart()) {
                             runningMonitor.start();
                         }
                     }
@@ -389,10 +392,10 @@ public class CanalController {
             final String destination = entry.getKey();
             InstanceConfig config = entry.getValue();
             // 创建destination的工作节点
-            if (!config.getLazy() && !embededCanalServer.isStart(destination)) {
+            if (!embededCanalServer.isStart(destination)) {
                 // HA机制启动
                 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
-                if (!runningMonitor.isStart()) {
+                if (!config.getLazy() && !runningMonitor.isStart()) {
                     runningMonitor.start();
                 }
             }
@@ -448,6 +451,9 @@ public class CanalController {
                 String parentDir = path.substring(0, path.lastIndexOf('/'));
                 zkclientx.createPersistent(parentDir, true);
                 zkclientx.createEphemeral(path);
+            } catch (ZkNodeExistsException e) {
+                // ignore
+                // 因为第一次启动时创建了cid,但在stop/start的时可能会关闭和新建,允许出现NodeExists问题s
             }
 
         }

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/SpringInstanceConfigMonitor.java

@@ -175,6 +175,7 @@ public class SpringInstanceConfigMonitor extends AbstractCanalLifeCycle implemen
         InstanceAction action = actions.remove(destination);
         try {
             action.stop(destination);
+            lastFiles.remove(destination);
             logger.info("auto notify stop {} successful.", destination);
         } catch (Throwable e) {
             logger.error("scan delete found[{}] but stop failed", destination, ExceptionUtils.getFullStackTrace(e));

+ 163 - 0
deployer/src/main/resources/spring/local-instance.xml

@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
+	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:lang="http://www.springframework.org/schema/lang"
+	xmlns:context="http://www.springframework.org/schema/context"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
+           http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-2.0.xsd
+           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
+           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
+	default-autowire="byName">
+
+	<!-- properties -->
+	<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
+		<property name="ignoreResourceNotFound" value="true" />
+		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
+		<property name="locationNames">
+			<list>
+				<value>classpath:canal.properties</value>
+				<value>classpath:${canal.instance.destination:}/instance.properties</value>
+			</list>
+		</property>
+	</bean>
+	
+	<bean id="socketAddressEditor" class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor" />
+	<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> 
+		<property name="propertyEditorRegistrars">
+			<list>
+				<ref bean="socketAddressEditor" />
+			</list>
+		</property>
+	</bean>
+	
+	<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
+		<property name="destination" value="${canal.instance.destination}" />
+		<property name="eventParser">
+			<ref local="eventParser" />
+		</property>
+		<property name="eventSink">
+			<ref local="eventSink" />
+		</property>
+		<property name="eventStore">
+			<ref local="eventStore" />
+		</property>
+		<property name="metaManager">
+			<ref local="metaManager" />
+		</property>
+		<property name="alarmHandler">
+			<ref local="alarmHandler" />
+		</property>
+		<property name="eventFetcher">
+			<ref local="eventFetcher"/>
+		</property>
+	</bean>
+	
+	<bean id="eventFetcher" class="com.alibaba.otter.canal.mq.fetch.CanalEventFetcherWithMq">
+		<property name="enable" value="${canal.instance.mq.enable}"/>
+		<property name="sendMode" value="${canal.instance.mq.sendMode}"/>
+	</bean>
+	
+	<!-- 报警处理类 -->
+	<bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.MsgAlarmHandler">
+		<property name="enabled" value="${canal.alarm.enable}" />
+		<property name="environment" value="${canal.alarm.environment}" />
+		<property name="intercenterApi" value="${canal.alarm.intercenter.api}" />
+		<property name="administrators" value="${canal.alarm.administrators}" />
+	</bean>
+	
+	<bean id="zkClientx" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean" >
+		<property name="targetClass" value="com.alibaba.otter.canal.common.zookeeper.ZkClientx" />
+		<property name="targetMethod" value="getZkClient" />
+		<property name="arguments">
+			<list>
+				<value>${canal.zkServers:127.0.0.1:2181}</value>
+			</list>
+		</property>
+	</bean>
+	
+	<bean id="metaManager" class="com.alibaba.otter.canal.meta.PeriodMixedMetaManager">
+		<property name="zooKeeperMetaManager">
+			<bean class="com.alibaba.otter.canal.meta.ZooKeeperMetaManager">
+				<property name="zkClientx" ref="zkClientx" />
+			</bean>
+		</property>
+		<property name="period" value="${canal.zookeeper.flush.period:1000}" />
+	</bean>
+	
+	<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
+		<property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
+		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
+		<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
+		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
+	</bean>
+	
+	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
+		<property name="eventStore" ref="eventStore" />
+	</bean>
+
+	<bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser">
+		<property name="destination" value="${canal.instance.destination}" />			
+		<property name="alarmHandler" ref="alarmHandler" />
+		
+		<!-- 解析过滤处理 -->
+		<property name="eventFilter">
+			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
+				<constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" />
+			</bean>
+		</property>
+		
+		<property name="eventBlackFilter">
+			<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
+				<constructor-arg index="0" value="${canal.instance.filter.black.regex:}" />
+				<constructor-arg index="1" value="false" />
+			</bean>
+		</property>
+		
+		<!-- 最大事务解析大小,超过该大小后事务将被切分为多个事务投递 -->
+		<property name="transactionSize" value="${canal.instance.transaction.size:1024}" />
+			
+		<!-- 解析编码 -->
+		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
+	
+		<!-- 解析位点记录 -->
+		<property name="logPositionManager">
+			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
+				<property name="primary">
+					<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
+				</property>
+				<property name="failback">
+					<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
+						<property name="metaManager" ref="metaManager" />
+					</bean>
+				</property>
+			</bean>
+		</property>
+		
+		<!-- 解析数据库信息 -->
+		<property name="masterInfo">
+			<bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo">
+				<property name="address" value="${canal.instance.master.address}" />
+				<property name="username" value="${canal.instance.dbUsername:retl}" />
+				<property name="password" value="${canal.instance.dbPassword:retl}" />
+				<property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:retl}" />
+			</bean>
+		</property>
+		
+		<!-- 解析起始位点 -->
+		<property name="masterPosition">
+			<bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
+				<property name="journalName" value="${canal.instance.master.journal.name}" />
+				<property name="position" value="${canal.instance.master.position}" />
+				<property name="timestamp" value="${canal.instance.master.timestamp}" />
+			</bean>
+		</property>
+		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
+		<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
+		<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
+		<property name="filterRows" value="${canal.instance.filter.rows:false}" />
+		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
+		<property name="needWait" value="${canal.instance.parser.needWait:true}"/>
+		<property name="directory" value="${canal.instance.parser.directory}"/>
+	</bean>
+</beans>

+ 1 - 1
driver/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>

+ 2 - 1
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -101,7 +101,8 @@ public class MysqlConnector {
                     MysqlUpdateExecutor executor = new MysqlUpdateExecutor(connector);
                     executor.update("KILL CONNECTION " + connectionId);
                 } catch (Exception e) {
-                    throw new IOException("KILL DUMP " + connectionId + " failure:" + ExceptionUtils.getStackTrace(e));
+                    // 忽略具体异常
+                    logger.info("KILL DUMP " + connectionId + " failure:" + ExceptionUtils.getStackTrace(e));
                 } finally {
                     if (connector != null) {
                         connector.disconnect();

+ 1 - 1
example/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
filter/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
instance/core/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../../pom.xml</relativePath>
 	</parent>
 	<artifactId>canal.instance.core</artifactId>

+ 1 - 1
instance/manager/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../../pom.xml</relativePath>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
instance/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
instance/spring/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../../pom.xml</relativePath>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
meta/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
meta/src/main/java/com/alibaba/otter/canal/meta/PeriodMixedMetaManager.java

@@ -142,8 +142,8 @@ public class PeriodMixedMetaManager extends MemoryMetaManager implements CanalMe
     }
 
     public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
-        updateCursorTasks.add(clientIdentity);// 添加到任务队列中进行触发
         super.updateCursor(clientIdentity, position);
+        updateCursorTasks.add(clientIdentity);// 添加到任务队列中进行触发
     }
 
     public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {

+ 1 - 1
parse/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<artifactId>canal.parse</artifactId>

+ 5 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -22,7 +22,6 @@ import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
 import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer.TransactionFlushCallback;
-import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
 import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry;
@@ -325,7 +324,6 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
             startTs = System.currentTimeMillis();
         }
         CanalEntry.Entry event = binlogParser.parse(bod);
-
         if (enabled) {
             this.parsingInterval = System.currentTimeMillis() - startTs;
         }
@@ -388,7 +386,11 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
             String name = String.format("destination = %s , address = %s , HeartBeatTimeTask",
                 destination,
                 runningInfo == null ? null : runningInfo.getAddress().toString());
-            synchronized (MysqlEventParser.class) {
+            synchronized (AbstractEventParser.class) {
+                // synchronized (MysqlEventParser.class) {
+                // why use MysqlEventParser.class, u know, MysqlEventParser is
+                // the child class 4 AbstractEventParser,
+                // do this is ...
                 if (timer == null) {
                     timer = new Timer(name, true);
                 }

+ 12 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/TableMeta.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.parse.inbound;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
@@ -44,6 +45,17 @@ public class TableMeta {
         this.fileds = fileds;
     }
 
+    public List<FieldMeta> getPrimaryFields() {
+        List<FieldMeta> primarys = new ArrayList<TableMeta.FieldMeta>();
+        for (FieldMeta meta : fileds) {
+            if (meta.isKey()) {
+                primarys.add(meta);
+            }
+        }
+
+        return primarys;
+    }
+
     public static class FieldMeta {
 
         private String columnName;

+ 19 - 13
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -8,7 +8,6 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.mysql.local.BinLogFileQueue;
@@ -18,6 +17,7 @@ import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.LogPosition;
 import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
 
 /**
  * local bin log connection (not real connection)
@@ -83,9 +83,9 @@ public class LocalBinLogConnection implements ErosaConnection {
             context.setLogPosition(new LogPosition(binlogfilename, binlogPosition));
             while (running) {
                 boolean needContinue = true;
-                LogEvent event;
-                while (fetcher.fetch()) {
-                    event = decoder.decode(fetcher, context);
+				LogEvent event = null;
+                L:while (fetcher.fetch()) {
+                    /*event = decoder.decode(fetcher, context);
                     if (event == null) {
                         throw new CanalParseException("parse failed");
                     }
@@ -93,15 +93,20 @@ public class LocalBinLogConnection implements ErosaConnection {
                     if (!func.sink(event)) {
                         needContinue = false;
                         break;
-                    }
-
-                    // do {
-                    // event = decoder.decode(fetcher, context);
-                    // if (event != null && !func.sink(event)) {
-                    // needContinue = false;
-                    // break;
-                    // }
-                    // } while (event != null);
+                    }*/
+
+					do {
+						if (event == null) {
+							event = new RotateLogEvent(context.getLogPosition().getFileName(), context.getLogPosition().getPosition());
+						} else {
+							event = decoder.decode(fetcher, context);
+						}
+												
+						if (event != null && !func.sink(event)) {
+							needContinue = false;
+							break L;
+						}
+					} while (event != null);
                 }
 
                 if (needContinue) {// 读取下一个
@@ -145,6 +150,7 @@ public class LocalBinLogConnection implements ErosaConnection {
 
         FileLogFetcher fetcher = new FileLogFetcher(bufferSize);
         LogDecoder decoder = new LogDecoder();
+        decoder.handle(LogEvent.FORMAT_DESCRIPTION_EVENT);
         decoder.handle(LogEvent.QUERY_EVENT);
         decoder.handle(LogEvent.XID_EVENT);
         LogContext context = new LogContext();

+ 66 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java

@@ -1,10 +1,14 @@
 package com.alibaba.otter.canal.parse.inbound.mysql;
 
+import java.io.IOException;
+
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
+import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
+import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
 import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
@@ -21,7 +25,9 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
     // 数据库信息
     private AuthenticationInfo masterInfo;
     private EntryPosition      masterPosition;        // binlog信息
-
+    private MysqlConnection    metaConnection;        // 查询meta信息的链接
+    private TableMetaCache     tableMetaCache;        // 对应meta
+    
     private String             directory;
     private boolean            needWait   = false;
     private int                bufferSize = 16 * 1024;
@@ -33,17 +39,60 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
     @Override
     protected ErosaConnection buildErosaConnection() {
         return buildLocalBinLogConnection();
-    }
+    }   
+    
+    @Override
+	protected void preDump(ErosaConnection connection) {
+    	metaConnection = buildMysqlConnection();
+        try {
+            metaConnection.connect();
+        } catch (IOException e) {
+            throw new CanalParseException(e);
+        }
+        
+        tableMetaCache = new TableMetaCache(metaConnection);
+        ((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
+	}
+
+	@Override
+	protected void afterDump(ErosaConnection connection) {
+		if (metaConnection != null) {
+            try {
+                metaConnection.disconnect();
+            } catch (IOException e) {
+                logger.error("ERROR # disconnect meta connection for address:{}", metaConnection.getConnector()
+                    .getAddress(), e);
+            }
+        }
+	}
 
-    public void start() throws CanalParseException {
+	public void start() throws CanalParseException {
         if (runningInfo == null) { // 第一次链接主库
             runningInfo = masterInfo;
         }
 
         super.start();
     }
+	
+    @Override
+	public void stop() {
+    	if (metaConnection != null) {
+            try {
+                metaConnection.disconnect();
+            } catch (IOException e) {
+                logger.error("ERROR # disconnect meta connection for address:{}", metaConnection.getConnector()
+                    .getAddress(), e);
+            }
+        }
+
+        if (tableMetaCache != null) {
+            tableMetaCache.clearTableMeta();
+        }
 
-    private ErosaConnection buildLocalBinLogConnection() {
+        super.stop();
+	}
+
+	private ErosaConnection buildLocalBinLogConnection() {
         LocalBinLogConnection connection = new LocalBinLogConnection();
 
         connection.setBufferSize(this.bufferSize);
@@ -52,6 +101,19 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
 
         return connection;
     }
+
+    private MysqlConnection buildMysqlConnection() {
+        MysqlConnection connection = new MysqlConnection(runningInfo.getAddress(),
+            runningInfo.getUsername(),
+            runningInfo.getPassword(),
+            connectionCharsetNumber,
+            runningInfo.getDefaultDatabaseName());
+        connection.getConnector().setReceiveBufferSize(64 * 1024);
+        connection.getConnector().setSendBufferSize(64 * 1024);
+        connection.getConnector().setSoTimeout(30 * 1000);
+        connection.setCharset(connectionCharset);
+        return connection;
+    }
     
     @Override
     protected EntryPosition findStartPosition(ErosaConnection connection) {

+ 37 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -6,6 +6,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.List;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -24,6 +25,8 @@ import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
+import com.taobao.tddl.dbsync.binlog.LogPosition;
+import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
 
 public class MysqlConnection implements ErosaConnection {
 
@@ -34,6 +37,7 @@ public class MysqlConnection implements ErosaConnection {
     private Charset             charset = Charset.forName("UTF-8");
     private BinlogFormat        binlogFormat;
     private BinlogImage         binlogImage;
+    private int                 binlogChecksum;
 
     public MysqlConnection(){
     }
@@ -79,7 +83,7 @@ public class MysqlConnection implements ErosaConnection {
      */
     public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
         updateSettings();
-
+        loadBinlogChecksum();
         sendBinlogDump(binlogfilename, binlogPosition);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         fetcher.start(connector.getChannel());
@@ -89,6 +93,8 @@ public class MysqlConnection implements ErosaConnection {
         decoder.handle(LogEvent.QUERY_EVENT);
         decoder.handle(LogEvent.XID_EVENT);
         LogContext context = new LogContext();
+        context.setLogPosition(new LogPosition(binlogfilename));
+        context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
         while (fetcher.fetch()) {
             LogEvent event = null;
             event = decoder.decode(fetcher, context);
@@ -105,11 +111,14 @@ public class MysqlConnection implements ErosaConnection {
 
     public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
         updateSettings();
+        loadBinlogChecksum();
         sendBinlogDump(binlogfilename, binlogPosition);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         fetcher.start(connector.getChannel());
         LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
         LogContext context = new LogContext();
+        context.setLogPosition(new LogPosition(binlogfilename));
+        context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
         while (fetcher.fetch()) {
             LogEvent event = null;
             event = decoder.decode(fetcher, context);
@@ -196,9 +205,12 @@ public class MysqlConnection implements ErosaConnection {
             // 如果不设置会出现错误: Slave can not handle replication events with the
             // checksum that master is configured to log
             // 但也不能乱设置,需要和mysql server的checksum配置一致,不然RotateLogEvent会出现乱码
-            update("set @master_binlog_checksum= '@@global.binlog_checksum'");
+            // '@@global.binlog_checksum'需要去掉单引号,在mysql 5.6.29下导致master退出
+            update("set @master_binlog_checksum= @@global.binlog_checksum");
         } catch (Exception e) {
-            logger.warn(ExceptionUtils.getFullStackTrace(e));
+            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
+                logger.warn(ExceptionUtils.getFullStackTrace(e));
+            }
         }
 
         try {
@@ -256,6 +268,28 @@ public class MysqlConnection implements ErosaConnection {
         }
     }
 
+    /**
+     * 获取主库checksum信息
+     * https://dev.mysql.com/doc/refman/5.6/en/replication-options
+     * -binary-log.html#option_mysqld_binlog-checksum
+     */
+    private void loadBinlogChecksum() {
+        ResultSetPacket rs = null;
+        try {
+            rs = query("select @master_binlog_checksum");
+        } catch (IOException e) {
+            throw new CanalParseException(e);
+        }
+
+        List<String> columnValues = rs.getFieldValues();
+        if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0) != null
+            && columnValues.get(0).toUpperCase().equals("CRC32")) {
+            binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
+        } else {
+            binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
+        }
+    }
+
     public static enum BinlogFormat {
 
         STATEMENT("STATEMENT"), ROW("ROW"), MIXED("MIXED");

+ 8 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -13,6 +13,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.springframework.util.CollectionUtils;
 
+import com.alibaba.otter.canal.common.utils.JsonUtils;
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.CanalHASwitchable;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
@@ -93,6 +94,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 for (BinlogFormat supportFormat : supportBinlogFormats) {
                     if (supportFormat != null && format == supportFormat) {
                         found = true;
+                        break;
                     }
                 }
 
@@ -107,6 +109,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 for (BinlogImage supportImage : supportBinlogImages) {
                     if (supportImage != null && image == supportImage) {
                         found = true;
+                        break;
                     }
                 }
 
@@ -177,10 +180,9 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     }
 
     protected void stopHeartBeat() {
+        TimerTask heartBeatTimerTask = this.heartBeatTimerTask;
         super.stopHeartBeat();
-
-        if (heartBeatTimerTask != null) {
-
+        if (heartBeatTimerTask != null && heartBeatTimerTask instanceof MysqlDetectingTimeTask) {
             MysqlConnection mysqlConnection = ((MysqlDetectingTimeTask) heartBeatTimerTask).getMysqlConnection();
             try {
                 mysqlConnection.disconnect();
@@ -395,7 +397,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                     // binlog定位位点失败,可能有两个原因:
                     // 1. binlog位点被删除
                     // 2.vip模式的mysql,发生了主备切换,判断一下serverId是否变化,针对这种模式可以发起一次基于时间戳查找合适的binlog位点
-                    boolean case2 = (standbyInfo.getAddress() == null)
+                    boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null)
                                     && logPosition.getPostion().getServerId() != null
                                     && !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
                     if (case2) {
@@ -410,7 +412,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                     }
                 }
                 // 其余情况
-                logger.warn("prepare to find start position just last position");
+                logger.warn("prepare to find start position just last position\n {}",
+                    JsonUtils.marshalToString(logPosition));
                 return logPosition.getPostion();
             } else {
                 // 针对切换的情况,考虑回退时间

+ 54 - 22
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -85,7 +85,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     // 是否跳过table相关的解析异常,比如表不存在或者列数量不匹配,issue 92
     private boolean                     filterTableError    = false;
     // 新增rows过滤,用于仅订阅除rows以外的数据
-    private boolean                     filterRows      = false;
+    private boolean                     filterRows          = false;
 
     public Entry parse(LogEvent logEvent) throws CanalParseException {
         if (logEvent == null || logEvent instanceof UnknownLogEvent) {
@@ -326,6 +326,11 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 return null;
             }
 
+            if (tableMetaCache.isOnRDS() && "mysql.ha_health_check".equals(fullname)) {
+                // 忽略rds模式的mysql.ha_health_check心跳数据
+                return null;
+            }
+
             EventType eventType = null;
             int type = event.getHeader().getType();
             if (LogEvent.WRITE_ROWS_EVENT_V1 == type || LogEvent.WRITE_ROWS_EVENT == type) {
@@ -402,34 +407,51 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
     private boolean parseOneRow(RowData.Builder rowDataBuilder, RowsLogEvent event, RowsLogBuffer buffer, BitSet cols,
                                 boolean isAfter, TableMeta tableMeta) throws UnsupportedEncodingException {
-        final int columnCnt = event.getTable().getColumnCnt();
-        final ColumnInfo[] columnInfo = event.getTable().getColumnInfo();
+        int columnCnt = event.getTable().getColumnCnt();
+        ColumnInfo[] columnInfo = event.getTable().getColumnInfo();
 
         boolean tableError = false;
         // check table fileds count,只能处理加字段
+        boolean existRDSNoPrimaryKey = false;
         if (tableMeta != null && columnInfo.length > tableMeta.getFileds().size()) {
-            // online ddl增加字段操作步骤:
-            // 1. 新增一张临时表,将需要做ddl表的数据全量导入
-            // 2. 在老表上建立I/U/D的trigger,增量的将数据插入到临时表
-            // 3. 锁住应用请求,将临时表rename为老表的名字,完成增加字段的操作
-            // 尝试做一次reload,可能因为ddl没有正确解析,或者使用了类似online ddl的操作
-            // 因为online ddl没有对应表名的alter语法,所以不会有clear cache的操作
-            tableMeta = getTableMeta(event.getTable().getDbName(), event.getTable().getTableName(), false);// 强制重新获取一次
-            if (tableMeta == null) {
-                tableError = true;
-                if (!filterTableError) {
-                    throw new CanalParseException("not found [" + event.getTable().getDbName() + "."
-                                                  + event.getTable().getTableName() + "] in db , pls check!");
+            if (tableMetaCache.isOnRDS()) {
+                // 特殊处理下RDS的场景
+                List<FieldMeta> primaryKeys = tableMeta.getPrimaryFields();
+                if (primaryKeys == null || primaryKeys.isEmpty()) {
+                    if (columnInfo.length == tableMeta.getFileds().size() + 1
+                        && columnInfo[columnInfo.length - 1].type == LogEvent.MYSQL_TYPE_LONGLONG) {
+                        existRDSNoPrimaryKey = true;
+                    }
                 }
             }
 
-            // 在做一次判断
-            if (tableMeta != null && columnInfo.length > tableMeta.getFileds().size()) {
-                tableError = true;
-                if (!filterTableError) {
-                    throw new CanalParseException("column size is not match for table:" + tableMeta.getFullName() + ","
-                                                  + columnInfo.length + " vs " + tableMeta.getFileds().size());
+            if (!existRDSNoPrimaryKey) {
+                // online ddl增加字段操作步骤:
+                // 1. 新增一张临时表,将需要做ddl表的数据全量导入
+                // 2. 在老表上建立I/U/D的trigger,增量的将数据插入到临时表
+                // 3. 锁住应用请求,将临时表rename为老表的名字,完成增加字段的操作
+                // 尝试做一次reload,可能因为ddl没有正确解析,或者使用了类似online ddl的操作
+                // 因为online ddl没有对应表名的alter语法,所以不会有clear cache的操作
+                tableMeta = getTableMeta(event.getTable().getDbName(), event.getTable().getTableName(), false);// 强制重新获取一次
+                if (tableMeta == null) {
+                    tableError = true;
+                    if (!filterTableError) {
+                        throw new CanalParseException("not found [" + event.getTable().getDbName() + "."
+                                                      + event.getTable().getTableName() + "] in db , pls check!");
+                    }
+                }
+
+                // 在做一次判断
+                if (tableMeta != null && columnInfo.length > tableMeta.getFileds().size()) {
+                    tableError = true;
+                    if (!filterTableError) {
+                        throw new CanalParseException("column size is not match for table:" + tableMeta.getFullName()
+                                                      + "," + columnInfo.length + " vs " + tableMeta.getFileds().size());
+                    }
                 }
+            } else {
+                logger.warn("[" + event.getTable().getDbName() + "." + event.getTable().getTableName()
+                            + "] is no primary key , skip alibaba_rds_row_id column");
             }
         }
 
@@ -440,6 +462,12 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 continue;
             }
 
+            if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
+                // 不解析最后一列
+                buffer.nextValue(info.type, info.meta, false);
+                continue;
+            }
+
             Column.Builder columnBuilder = Column.newBuilder();
 
             FieldMeta fieldMeta = null;
@@ -465,6 +493,10 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 }
             }
             buffer.nextValue(info.type, info.meta, isBinary);
+            if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
+                // 不解析最后一列
+                continue;
+            }
 
             int javaType = buffer.getJavaType();
             if (buffer.isNull()) {
@@ -718,7 +750,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     public void setFilterTableError(boolean filterTableError) {
         this.filterTableError = filterTableError;
     }
-    
+
     public void setFilterRows(boolean filterRows) {
         this.filterRows = filterRows;
     }

+ 17 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java

@@ -32,6 +32,7 @@ public class TableMetaCache {
     public static final String     COLUMN_DEFAULT = "COLUMN_DEFAULT";
     public static final String     EXTRA          = "EXTRA";
     private MysqlConnection        connection;
+    private boolean                isOnRDS        = false;
 
     // 第一层tableId,第二层schema.table,解决tableId重复,对应多张表
     private Map<String, TableMeta> tableMetaCache;
@@ -56,6 +57,13 @@ public class TableMetaCache {
 
         });
 
+        try {
+            ResultSetPacket packet = connection.query("show global variables  like 'rds\\_%'");
+            if (packet.getFieldValues().size() > 0) {
+                isOnRDS = true;
+            }
+        } catch (IOException e) {
+        }
     }
 
     public TableMeta getTableMeta(String schema, String table) {
@@ -136,4 +144,13 @@ public class TableMetaCache {
             .append('`')
             .toString();
     }
+
+    public boolean isOnRDS() {
+        return isOnRDS;
+    }
+
+    public void setOnRDS(boolean isOnRDS) {
+        this.isOnRDS = isOnRDS;
+    }
+
 }

+ 5 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.java

@@ -9,6 +9,8 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.IOFileFilter;
@@ -188,7 +190,9 @@ public class BinLogFileQueue {
         files.addAll(FileUtils.listFiles(directory, new IOFileFilter() {
 
             public boolean accept(File file) {
-                return file.getName().startsWith(baseName);
+				Pattern pattern = Pattern.compile("\\d+$");
+				Matcher matcher = pattern.matcher(file.getName());
+				return file.getName().startsWith(baseName) && matcher.find();
             }
 
             public boolean accept(File dir, String name) {

+ 73 - 56
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParserTest.java

@@ -13,6 +13,7 @@ import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
 import com.alibaba.otter.canal.parse.stub.AbstractCanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.protocol.position.LogIdentity;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
@@ -22,8 +23,8 @@ public class MysqlEventParserTest {
 
     private static final String DETECTING_SQL = "insert into retl.xdual values(1,now()) on duplicate key update x=now()";
     private static final String MYSQL_ADDRESS = "127.0.0.1";
-    private static final String USERNAME      = "xxxxx";
-    private static final String PASSWORD      = "xxxxx";
+    private static final String USERNAME      = "root";
+    private static final String PASSWORD      = "xxxxxx";
 
     @Test
     public void test_position() throws InterruptedException {
@@ -45,20 +46,23 @@ public class MysqlEventParserTest {
             public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)
                                                                                                         throws CanalSinkException {
                 for (Entry entry : entrys) {
-                    entryCount.incrementAndGet();
-                    String logfilename = entry.getHeader().getLogfileName();
-                    long logfileoffset = entry.getHeader().getLogfileOffset();
-                    long executeTime = entry.getHeader().getExecuteTime();
-                    entryPosition.setJournalName(logfilename);
-                    entryPosition.setPosition(logfileoffset);
-                    entryPosition.setTimestamp(executeTime);
-
-                    break;
+                    if (entry.getEntryType() != EntryType.HEARTBEAT) {
+                        entryCount.incrementAndGet();
+                        String logfilename = entry.getHeader().getLogfileName();
+                        long logfileoffset = entry.getHeader().getLogfileOffset();
+                        long executeTime = entry.getHeader().getExecuteTime();
+                        entryPosition.setJournalName(logfilename);
+                        entryPosition.setPosition(logfileoffset);
+                        entryPosition.setTimestamp(executeTime);
+                        break;
+                    }
                 }
 
-                controller.stop();
-                timeoutChecker.stop();
-                timeoutChecker.touch();
+                if (entryCount.get() > 0) {
+                    controller.stop();
+                    timeoutChecker.stop();
+                    timeoutChecker.touch();
+                }
                 return true;
             }
         });
@@ -92,14 +96,14 @@ public class MysqlEventParserTest {
 
     @Test
     public void test_timestamp() throws InterruptedException {
-        final TimeoutChecker timeoutChecker = new TimeoutChecker(30 * 1000);
+        final TimeoutChecker timeoutChecker = new TimeoutChecker(3000 * 1000);
         final AtomicLong entryCount = new AtomicLong(0);
         final EntryPosition entryPosition = new EntryPosition();
 
         final MysqlEventParser controller = new MysqlEventParser();
-        final EntryPosition defaultPosition = buildPosition(null, null, 1322803601000L);
+        final EntryPosition defaultPosition = buildPosition(null, null, 1475116855000L);
         controller.setSlaveId(3344L);
-        controller.setDetectingEnable(true);
+        controller.setDetectingEnable(false);
         controller.setDetectingSQL(DETECTING_SQL);
         controller.setMasterInfo(buildAuthentication());
         controller.setMasterPosition(defaultPosition);
@@ -109,22 +113,25 @@ public class MysqlEventParserTest {
             public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)
                                                                                                         throws CanalSinkException {
                 for (Entry entry : entrys) {
-                    entryCount.incrementAndGet();
-
-                    String logfilename = entry.getHeader().getLogfileName();
-                    long logfileoffset = entry.getHeader().getLogfileOffset();
-                    long executeTime = entry.getHeader().getExecuteTime();
-
-                    entryPosition.setJournalName(logfilename);
-                    entryPosition.setPosition(logfileoffset);
-                    entryPosition.setTimestamp(executeTime);
-
-                    break;
+                    if (entry.getEntryType() != EntryType.HEARTBEAT) {
+                        entryCount.incrementAndGet();
+
+                        String logfilename = entry.getHeader().getLogfileName();
+                        long logfileoffset = entry.getHeader().getLogfileOffset();
+                        long executeTime = entry.getHeader().getExecuteTime();
+
+                        entryPosition.setJournalName(logfilename);
+                        entryPosition.setPosition(logfileoffset);
+                        entryPosition.setTimestamp(executeTime);
+                        break;
+                    }
                 }
 
-                controller.stop();
-                timeoutChecker.stop();
-                timeoutChecker.touch();
+                if (entryCount.get() > 0) {
+                    controller.stop();
+                    timeoutChecker.stop();
+                    timeoutChecker.touch();
+                }
                 return true;
             }
         });
@@ -174,21 +181,26 @@ public class MysqlEventParserTest {
             public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)
                                                                                                         throws CanalSinkException {
                 for (Entry entry : entrys) {
-                    entryCount.incrementAndGet();
+                    if (entry.getEntryType() != EntryType.HEARTBEAT) {
+
+                        entryCount.incrementAndGet();
 
-                    String logfilename = entry.getHeader().getLogfileName();
-                    long logfileoffset = entry.getHeader().getLogfileOffset();
-                    long executeTime = entry.getHeader().getExecuteTime();
+                        String logfilename = entry.getHeader().getLogfileName();
+                        long logfileoffset = entry.getHeader().getLogfileOffset();
+                        long executeTime = entry.getHeader().getExecuteTime();
 
-                    entryPosition.setJournalName(logfilename);
-                    entryPosition.setPosition(logfileoffset);
-                    entryPosition.setTimestamp(executeTime);
-                    break;
+                        entryPosition.setJournalName(logfilename);
+                        entryPosition.setPosition(logfileoffset);
+                        entryPosition.setTimestamp(executeTime);
+                        break;
+                    }
                 }
 
-                controller.stop();
-                timeoutChecker.stop();
-                timeoutChecker.touch();
+                if (entryCount.get() > 0) {
+                    controller.stop();
+                    timeoutChecker.stop();
+                    timeoutChecker.touch();
+                }
                 return true;
             }
         });
@@ -244,22 +256,27 @@ public class MysqlEventParserTest {
             public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)
                                                                                                         throws CanalSinkException {
                 for (Entry entry : entrys) {
-                    entryCount.incrementAndGet();
-
-                    // String logfilename = entry.getHeader().getLogfileName();
-                    // long logfileoffset =
-                    // entry.getHeader().getLogfileOffset();
-                    long executeTime = entry.getHeader().getExecuteTime();
-
-                    // entryPosition.setJournalName(logfilename);
-                    // entryPosition.setPosition(logfileoffset);
-                    entryPosition.setTimestamp(executeTime);
-                    break;
+                    if (entry.getEntryType() != EntryType.HEARTBEAT) {
+                        entryCount.incrementAndGet();
+
+                        // String logfilename =
+                        // entry.getHeader().getLogfileName();
+                        // long logfileoffset =
+                        // entry.getHeader().getLogfileOffset();
+                        long executeTime = entry.getHeader().getExecuteTime();
+
+                        // entryPosition.setJournalName(logfilename);
+                        // entryPosition.setPosition(logfileoffset);
+                        entryPosition.setTimestamp(executeTime);
+                        break;
+                    }
                 }
 
-                controller.stop();
-                timeoutChecker.stop();
-                timeoutChecker.touch();
+                if (entryCount.get() > 0) {
+                    controller.stop();
+                    timeoutChecker.stop();
+                    timeoutChecker.touch();
+                }
                 return true;
             }
         });

+ 2 - 2
pom.xml

@@ -4,7 +4,7 @@
     <artifactId>canal</artifactId>
     <packaging>pom</packaging>
     <name>canal module for otter ${project.version}</name>
-    <version>1.0.23-SNAPSHOT</version>
+    <version>1.0.24-SNAPSHOT</version>
     <url>https://github.com/alibaba/canal</url>
     <parent>
         <groupId>org.sonatype.oss</groupId>
@@ -202,7 +202,7 @@
             <dependency>
                 <groupId>com.google.protobuf</groupId>
                 <artifactId>protobuf-java</artifactId>
-                <version>2.4.1</version>
+                <version>2.6.1</version>
             </dependency>
             <!-- log -->
             <dependency>

+ 1 - 1
protocol/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>

Fichier diff supprimé car celui-ci est trop grand
+ 477 - 177
protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalEntry.java


Fichier diff supprimé car celui-ci est trop grand
+ 431 - 188
protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalPacket.java


+ 1 - 1
server/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<artifactId>canal.server</artifactId>

+ 2 - 0
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -131,6 +131,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
      */
     @Override
     public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
+    	checkStart(clientIdentity.getDestination());
+    	
         CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
         if (!canalInstance.getMetaManager().isStart()) {
             canalInstance.getMetaManager().start();

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -66,7 +66,6 @@ public class SessionHandler extends SimpleChannelHandler {
                             Short.valueOf(sub.getClientId()),
                             sub.getFilter());
                         MDC.put("destination", clientIdentity.getDestination());
-                        embeddedServer.subscribe(clientIdentity);
 
                         // 尝试启动,如果已经启动,忽略
                         if (!embeddedServer.isStart(clientIdentity.getDestination())) {
@@ -76,6 +75,7 @@ public class SessionHandler extends SimpleChannelHandler {
                             }
                         }
 
+                        embeddedServer.subscribe(clientIdentity);
                         ctx.setAttachment(clientIdentity);// 设置状态数据
                         NettyUtils.ack(ctx.getChannel(), null);
                     } else {

+ 1 - 1
sink/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>

+ 1 - 1
store/pom.xml

@@ -3,7 +3,7 @@
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
 		<artifactId>canal</artifactId>
-		<version>1.0.23-SNAPSHOT</version>
+		<version>1.0.24-SNAPSHOT</version>
 		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<groupId>com.alibaba.otter</groupId>

Certains fichiers n'ont pas été affichés car il y a eu trop de fichiers modifiés dans ce diff