Selaa lähdekoodia

Merge pull request #1 from alibaba/master

pull
rewerma 7 vuotta sitten
vanhempi
commit
ad201660a6
31 muutettua tiedostoa jossa 1082 lisäystä ja 647 poistoa
  1. 3 1
      client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java
  2. 2 2
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/CharsetConversion.java
  3. 32 3
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java
  4. 19 11
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogEvent.java
  5. 11 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/FormatDescriptionLogEvent.java
  6. 3 3
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/GtidLogEvent.java
  7. 16 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/TransactionContextLogEvent.java
  8. 16 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/ViewChangeEvent.java
  9. 65 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/XaPrepareLogEvent.java
  10. 19 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/StartEncryptionLogEvent.java
  11. 3 2
      deployer/src/main/resources/example/instance.properties
  12. 5 0
      deployer/src/main/resources/spring/default-instance.xml
  13. 1 1
      deployer/src/main/resources/spring/file-instance.xml
  14. 4 0
      deployer/src/main/resources/spring/group-instance.xml
  15. 5 0
      deployer/src/main/resources/spring/memory-instance.xml
  16. 40 14
      example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java
  17. 5 4
      meta/src/main/java/com/alibaba/otter/canal/meta/FileMixedMetaManager.java
  18. 2 1
      meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java
  19. 1 12
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  20. 13 17
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java
  21. 5 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  22. 84 6
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  23. 0 55
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvertGTID.java
  24. 3 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java
  25. 27 2
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlDumpTest.java
  26. 30 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/FastsqlSchemaTest.java
  27. 40 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta_DDL_Test.java
  28. 21 0
      parse/src/test/resources/ddl/ddl_test1.sql
  29. 1 1
      pom.xml
  30. 603 507
      protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalEntry.java
  31. 3 0
      protocol/src/main/java/com/alibaba/otter/canal/protocol/EntryProtocol.proto

+ 3 - 1
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -457,8 +457,10 @@ public class SimpleCanalConnector implements CanalConnector {
                 }
 
                 running = true;
-
                 mutex.get();// 阻塞等待
+            } else {
+                // 单机模式直接设置为running
+                running = true;
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();

+ 2 - 2
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/CharsetConversion.java

@@ -103,8 +103,8 @@ public final class CharsetConversion {
         putEntry(42, "latin7", "latin7_general_cs", "ISO8859_7");
         putEntry(43, "macce", "macce_bin", "MacCentralEurope");
         putEntry(44, "cp1250", "cp1250_croatian_ci", "Cp1250");
-        putEntry(45, "utf8mb4", "utf8mb4_general_ci", "MacCentralEurope");
-        putEntry(46, "utf8mb4", "utf8mb4_bin", "MacCentralEurope");
+        putEntry(45, "utf8mb4", "utf8mb4_general_ci", "UTF-8");
+        putEntry(46, "utf8mb4", "utf8mb4_bin", "UTF-8");
         putEntry(47, "latin1", "latin1_bin", "ISO8859_1");
         putEntry(48, "latin1", "latin1_general_ci", "ISO8859_1");
         putEntry(49, "latin1", "latin1_general_cs", "ISO8859_1");

+ 32 - 3
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -30,15 +30,19 @@ import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.StartLogEventV3;
 import com.taobao.tddl.dbsync.binlog.event.StopLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.TransactionContextLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.UnknownLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.UserVarLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.ViewChangeEvent;
 import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.XaPrepareLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.BinlogCheckPointLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidListLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.mariadb.StartEncryptionLogEvent;
 
 /**
  * Implements a binary-log decoder.
@@ -366,6 +370,24 @@ public final class LogDecoder {
                 logPosition.position = header.getLogPos();
                 return event;
             }
+            case LogEvent.TRANSACTION_CONTEXT_EVENT: {
+                TransactionContextLogEvent event = new TransactionContextLogEvent(header, buffer, descriptionEvent);
+                /* updating position in context */
+                logPosition.position = header.getLogPos();
+                return event;
+            }
+            case LogEvent.VIEW_CHANGE_EVENT: {
+                ViewChangeEvent event = new ViewChangeEvent(header, buffer, descriptionEvent);
+                /* updating position in context */
+                logPosition.position = header.getLogPos();
+                return event;
+            }
+            case LogEvent.XA_PREPARE_LOG_EVENT: {
+                XaPrepareLogEvent event = new XaPrepareLogEvent(header, buffer, descriptionEvent);
+                /* updating position in context */
+                logPosition.position = header.getLogPos();
+                return event;
+            }
             case LogEvent.ANNOTATE_ROWS_EVENT: {
                 AnnotateRowsEvent event = new AnnotateRowsEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
@@ -390,6 +412,12 @@ public final class LogDecoder {
                 logPosition.position = header.getLogPos();
                 return event;
             }
+            case LogEvent.START_ENCRYPTION_EVENT: {
+                StartEncryptionLogEvent event = new StartEncryptionLogEvent(header, buffer, descriptionEvent);
+                /* updating position in context */
+                logPosition.position = header.getLogPos();
+                return event;
+            }
             default:
                 /*
                  * Create an object of Ignorable_log_event for unrecognized
@@ -402,9 +430,10 @@ public final class LogDecoder {
                     logPosition.position = header.getLogPos();
                     return event;
                 } else {
-                    if (logger.isWarnEnabled()) logger.warn("Skipping unrecognized binlog event "
-                                                            + LogEvent.getTypeName(header.getType()) + " from: "
-                                                            + context.getLogPosition());
+                    if (logger.isWarnEnabled()) {
+                        logger.warn("Skipping unrecognized binlog event " + LogEvent.getTypeName(header.getType())
+                                    + " from: " + context.getLogPosition());
+                    }
                 }
         }
 

+ 19 - 11
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogEvent.java

@@ -161,9 +161,17 @@ public abstract class LogEvent {
 
     public static final int    PREVIOUS_GTIDS_LOG_EVENT                 = 35;
 
+    /* MySQL 5.7 events */
+    public static final int    TRANSACTION_CONTEXT_EVENT                = 36;
+
+    public static final int    VIEW_CHANGE_EVENT                        = 37;
+
+    /* Prepared XA transaction terminal event similar to Xid */
+    public static final int    XA_PREPARE_LOG_EVENT                     = 38;
+
     // mariaDb 5.5.34
     /* New MySQL/Sun events are to be added right above this comment */
-    public static final int    MYSQL_EVENTS_END                         = 36;
+    public static final int    MYSQL_EVENTS_END                         = 39;
 
     public static final int    MARIA_EVENTS_BEGIN                       = 160;
     /* New Maria event numbers start from here */
@@ -189,8 +197,10 @@ public abstract class LogEvent {
      */
     public static final int    GTID_LIST_EVENT                          = 163;
 
+    public static final int    START_ENCRYPTION_EVENT                   = 164;
+
     /** end marker */
-    public static final int    ENUM_END_EVENT                           = 164;
+    public static final int    ENUM_END_EVENT                           = 165;
 
     /**
      * 1 byte length, 1 byte format Length is total length in bytes, including 2
@@ -359,7 +369,7 @@ public abstract class LogEvent {
     protected static final Log logger = LogFactory.getLog(LogEvent.class);
 
     protected final LogHeader  header;
-    
+
     /**
      * mysql半同步semi标识
      * 
@@ -369,18 +379,16 @@ public abstract class LogEvent {
      * </pre>
      */
     protected int              semival;
-    
-    
 
     public int getSemival() {
-		return semival;
-	}
+        return semival;
+    }
 
-	public void setSemival(int semival) {
-		this.semival = semival;
-	}
+    public void setSemival(int semival) {
+        this.semival = semival;
+    }
 
-	protected LogEvent(LogHeader header){
+    protected LogEvent(LogHeader header){
         this.header = header;
     }
 

+ 11 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/FormatDescriptionLogEvent.java

@@ -59,10 +59,15 @@ public final class FormatDescriptionLogEvent extends StartLogEventV3 {
     public static final int   HEARTBEAT_HEADER_LEN                = 0;
     public static final int   IGNORABLE_HEADER_LEN                = 0;
     public static final int   ROWS_HEADER_LEN_V2                  = 10;
+    public static final int   TRANSACTION_CONTEXT_HEADER_LEN      = 18;
+    public static final int   VIEW_CHANGE_HEADER_LEN              = 52;
+    public static final int   XA_PREPARE_HEADER_LEN               = 0;
+
     public static final int   ANNOTATE_ROWS_HEADER_LEN            = 0;
     public static final int   BINLOG_CHECKPOINT_HEADER_LEN        = 4;
     public static final int   GTID_HEADER_LEN                     = 19;
     public static final int   GTID_LIST_HEADER_LEN                = 4;
+    public static final int   START_ENCRYPTION_HEADER_LEN         = 0;
 
     public static final int   POST_HEADER_LENGTH                  = 11;
 
@@ -202,11 +207,17 @@ public final class FormatDescriptionLogEvent extends StartLogEventV3 {
                 postHeaderLen[GTID_LOG_EVENT - 1] = POST_HEADER_LENGTH;
                 postHeaderLen[ANONYMOUS_GTID_LOG_EVENT - 1] = POST_HEADER_LENGTH;
                 postHeaderLen[PREVIOUS_GTIDS_LOG_EVENT - 1] = IGNORABLE_HEADER_LEN;
+
+                postHeaderLen[TRANSACTION_CONTEXT_EVENT - 1] = TRANSACTION_CONTEXT_HEADER_LEN;
+                postHeaderLen[VIEW_CHANGE_EVENT - 1] = VIEW_CHANGE_HEADER_LEN;
+                postHeaderLen[XA_PREPARE_LOG_EVENT - 1] = XA_PREPARE_HEADER_LEN;
+
                 // mariadb 10
                 postHeaderLen[ANNOTATE_ROWS_EVENT - 1] = ANNOTATE_ROWS_HEADER_LEN;
                 postHeaderLen[BINLOG_CHECKPOINT_EVENT - 1] = BINLOG_CHECKPOINT_HEADER_LEN;
                 postHeaderLen[GTID_EVENT - 1] = GTID_HEADER_LEN;
                 postHeaderLen[GTID_LIST_EVENT - 1] = GTID_LIST_HEADER_LEN;
+                postHeaderLen[START_ENCRYPTION_EVENT - 1] = START_ENCRYPTION_HEADER_LEN;
                 break;
 
             case 3: /* 4.0.x x>=2 */

+ 3 - 3
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/GtidLogEvent.java

@@ -1,11 +1,11 @@
 package com.taobao.tddl.dbsync.binlog.event;
 
-import com.taobao.tddl.dbsync.binlog.LogBuffer;
-import com.taobao.tddl.dbsync.binlog.LogEvent;
-
 import java.nio.ByteBuffer;
 import java.util.UUID;
 
+import com.taobao.tddl.dbsync.binlog.LogBuffer;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
+
 /**
  * @author jianghang 2013-4-8 上午12:36:29
  * @version 1.0.3

+ 16 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/TransactionContextLogEvent.java

@@ -0,0 +1,16 @@
+package com.taobao.tddl.dbsync.binlog.event;
+
+import com.taobao.tddl.dbsync.binlog.LogBuffer;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
+
+/**
+ * @author agapple 2018年5月7日 下午7:05:39
+ * @version 1.0.26
+ * @since mysql 5.7
+ */
+public class TransactionContextLogEvent extends LogEvent {
+
+    public TransactionContextLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
+        super(header);
+    }
+}

+ 16 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/ViewChangeEvent.java

@@ -0,0 +1,16 @@
+package com.taobao.tddl.dbsync.binlog.event;
+
+import com.taobao.tddl.dbsync.binlog.LogBuffer;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
+
+/**
+ * @author agapple 2018年5月7日 下午7:05:39
+ * @version 1.0.26
+ * @since mysql 5.7
+ */
+public class ViewChangeEvent extends LogEvent {
+
+    public ViewChangeEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
+        super(header);
+    }
+}

+ 65 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/XaPrepareLogEvent.java

@@ -0,0 +1,65 @@
+package com.taobao.tddl.dbsync.binlog.event;
+
+import com.taobao.tddl.dbsync.binlog.LogBuffer;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
+
+/**
+ * @author agapple 2018年5月7日 下午7:05:39
+ * @version 1.0.26
+ * @since mysql 5.7
+ */
+public class XaPrepareLogEvent extends LogEvent {
+
+    private boolean onePhase;
+    private int     formatId;
+    private int     gtridLength;
+    private int     bqualLength;
+    private byte[]  data;
+
+    public XaPrepareLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
+        super(header);
+
+        final int commonHeaderLen = descriptionEvent.getCommonHeaderLen();
+        final int postHeaderLen = descriptionEvent.getPostHeaderLen()[header.getType() - 1];
+
+        int offset = commonHeaderLen + postHeaderLen;
+        buffer.position(offset);
+
+        onePhase = (buffer.getInt8() == 0x00 ? false : true);
+
+        formatId = buffer.getInt32();
+        gtridLength = buffer.getInt32();
+        bqualLength = buffer.getInt32();
+
+        int MY_XIDDATASIZE = 128;
+        if (MY_XIDDATASIZE >= gtridLength + bqualLength && gtridLength >= 0 && gtridLength <= 64 && bqualLength >= 0
+            && bqualLength <= 64) {
+            data = buffer.getData(gtridLength + bqualLength);
+        } else {
+            formatId = -1;
+            gtridLength = 0;
+            bqualLength = 0;
+        }
+    }
+
+    public boolean isOnePhase() {
+        return onePhase;
+    }
+
+    public int getFormatId() {
+        return formatId;
+    }
+
+    public int getGtridLength() {
+        return gtridLength;
+    }
+
+    public int getBqualLength() {
+        return bqualLength;
+    }
+
+    public byte[] getData() {
+        return data;
+    }
+
+}

+ 19 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/StartEncryptionLogEvent.java

@@ -0,0 +1,19 @@
+package com.taobao.tddl.dbsync.binlog.event.mariadb;
+
+import com.taobao.tddl.dbsync.binlog.LogBuffer;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
+import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.LogHeader;
+
+/**
+ * mariadb的Start_encryption_log_event
+ * 
+ * @author agapple 2018年5月7日 下午7:23:02
+ * @version 1.0.26
+ */
+public class StartEncryptionLogEvent extends LogEvent {
+
+    public StartEncryptionLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
+        super(header);
+    }
+}

+ 3 - 2
deployer/src/main/resources/example/instance.properties

@@ -1,15 +1,16 @@
 #################################################
 ## mysql serverId
 canal.instance.mysql.slaveId=0
+
 # position info
 canal.instance.master.address=127.0.0.1:3306
+# enable gtid use true/false
+canal.instance.gtidon=false
 canal.instance.master.journal.name=
 canal.instance.master.position=
 canal.instance.master.timestamp=
 canal.instance.master.gtid=
 
-canal.instance.gtidon=true
-
 # table meta tsdb info
 canal.instance.tsdb.enable=true
 canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}

+ 5 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -166,6 +166,7 @@
 				<property name="journalName" value="${canal.instance.master.journal.name}" />
 				<property name="position" value="${canal.instance.master.position}" />
 				<property name="timestamp" value="${canal.instance.master.timestamp}" />
+				<property name="gtid" value="${canal.instance.master.gtid}" />
 			</bean>
 		</property>
 		<property name="standbyPosition">
@@ -173,6 +174,7 @@
 				<property name="journalName" value="${canal.instance.standby.journal.name}" />
 				<property name="position" value="${canal.instance.standby.position}" />
 				<property name="timestamp" value="${canal.instance.standby.timestamp}" />
+				<property name="gtid" value="${canal.instance.standby.gtid}" />
 			</bean>
 		</property>
 		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
@@ -187,5 +189,8 @@
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
+		
+		<!--是否启用GTID模式-->
+		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>
 	</bean>
 </beans>

+ 1 - 1
deployer/src/main/resources/spring/file-instance.xml

@@ -176,6 +176,6 @@
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
 
 		<!--是否启用GTID模式-->
-		<property name="isGTIDMode" value="${canal.instance.gtidon}"/>
+		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>
 	</bean>
 </beans>

+ 4 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -148,6 +148,7 @@
 				<property name="journalName" value="${canal.instance.master1.journal.name}" />
 				<property name="position" value="${canal.instance.master1.position}" />
 				<property name="timestamp" value="${canal.instance.master1.timestamp}" />
+				<property name="gtid" value="${canal.instance.master1.gtid}" />
 			</bean>
 		</property>
 		<property name="standbyPosition">
@@ -155,6 +156,7 @@
 				<property name="journalName" value="${canal.instance.standby1.journal.name}" />
 				<property name="position" value="${canal.instance.standby1.position}" />
 				<property name="timestamp" value="${canal.instance.standby1.timestamp}" />
+				<property name="gtid" value="${canal.instance.standby1.gtid}" />
 			</bean>
 		</property>
 		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
@@ -239,6 +241,7 @@
 				<property name="journalName" value="${canal.instance.master2.journal.name}" />
 				<property name="position" value="${canal.instance.master2.position}" />
 				<property name="timestamp" value="${canal.instance.master2.timestamp}" />
+				<property name="gtid" value="${canal.instance.master2.gtid}" />
 			</bean>
 		</property>
 		<property name="standbyPosition">
@@ -246,6 +249,7 @@
 				<property name="journalName" value="${canal.instance.standby2.journal.name}" />
 				<property name="position" value="${canal.instance.standby2.position}" />
 				<property name="timestamp" value="${canal.instance.standby2.timestamp}" />
+				<property name="gtid" value="${canal.instance.standby2.gtid}" />
 			</bean>
 		</property>
 		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />

+ 5 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -139,6 +139,7 @@
 				<property name="journalName" value="${canal.instance.master.journal.name}" />
 				<property name="position" value="${canal.instance.master.position}" />
 				<property name="timestamp" value="${canal.instance.master.timestamp}" />
+				<property name="gtid" value="${canal.instance.master.gtid}" />
 			</bean>
 		</property>
 		<property name="standbyPosition">
@@ -146,6 +147,7 @@
 				<property name="journalName" value="${canal.instance.standby.journal.name}" />
 				<property name="position" value="${canal.instance.standby.position}" />
 				<property name="timestamp" value="${canal.instance.standby.timestamp}" />
+				<property name="gtid" value="${canal.instance.standby.gtid}" />
 			</bean>
 		</property>
 		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
@@ -160,5 +162,8 @@
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:false}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
+		
+		<!--是否启用GTID模式-->
+		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>
 	</bean>
 </beans>

+ 40 - 14
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java

@@ -4,6 +4,7 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.SystemUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -17,6 +18,7 @@ import com.alibaba.otter.canal.protocol.CanalEntry.Column;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
 import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
+import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
 import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
 import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
@@ -58,10 +60,12 @@ public class AbstractCanalClientTest {
         context_format += "****************************************************" + SEP;
 
         row_format = SEP
-                     + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , delay : {} ms"
+                     + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms"
                      + SEP;
 
-        transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , delay : {}ms" + SEP;
+        transaction_format = SEP
+                             + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms"
+                             + SEP;
 
     }
 
@@ -92,13 +96,7 @@ public class AbstractCanalClientTest {
         if (!running) {
             return;
         }
-        running = false;
-        if (waiting) {
-            if (connector instanceof ClusterCanalConnector) {
-                ((ClusterCanalConnector) connector).setRetryTimes(-1);
-            }
-            thread.interrupt();
-        }
+        connector.stopRunning();
         if (thread != null) {
             try {
                 thread.join();
@@ -166,8 +164,12 @@ public class AbstractCanalClientTest {
         long time = entry.getHeader().getExecuteTime();
         Date date = new Date(time);
         SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
-        return entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":"
-               + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
+        String position = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":"
+                          + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
+        if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) {
+            position += " gtid(" + entry.getHeader().getGtid() + ")";
+        }
+        return position;
     }
 
     protected void printEntry(List<Entry> entrys) {
@@ -190,8 +192,9 @@ public class AbstractCanalClientTest {
                         new Object[] { entry.getHeader().getLogfileName(),
                                 String.valueOf(entry.getHeader().getLogfileOffset()),
                                 String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
-                                String.valueOf(delayTime) });
+                                entry.getHeader().getGtid(), String.valueOf(delayTime) });
                     logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
+                    printXAInfo(begin.getPropsList());
                 } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                     TransactionEnd end = null;
                     try {
@@ -202,11 +205,12 @@ public class AbstractCanalClientTest {
                     // 打印事务提交信息,事务id
                     logger.info("----------------\n");
                     logger.info(" END ----> transaction id: {}", end.getTransactionId());
+                    printXAInfo(end.getPropsList());
                     logger.info(transaction_format,
                         new Object[] { entry.getHeader().getLogfileName(),
                                 String.valueOf(entry.getHeader().getLogfileOffset()),
                                 String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
-                                String.valueOf(delayTime) });
+                                entry.getHeader().getGtid(), String.valueOf(delayTime) });
                 }
 
                 continue;
@@ -227,13 +231,14 @@ public class AbstractCanalClientTest {
                             String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
                             entry.getHeader().getTableName(), eventType,
                             String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
-                            String.valueOf(delayTime) });
+                            entry.getHeader().getGtid(), String.valueOf(delayTime) });
 
                 if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
                     logger.info(" sql ----> " + rowChage.getSql() + SEP);
                     continue;
                 }
 
+                printXAInfo(rowChage.getPropsList());
                 for (RowData rowData : rowChage.getRowDatasList()) {
                     if (eventType == EventType.DELETE) {
                         printColumn(rowData.getBeforeColumnsList());
@@ -260,6 +265,27 @@ public class AbstractCanalClientTest {
         }
     }
 
+    protected void printXAInfo(List<Pair> pairs) {
+        if (pairs == null) {
+            return;
+        }
+
+        String xaType = null;
+        String xaXid = null;
+        for (Pair pair : pairs) {
+            String key = pair.getKey();
+            if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) {
+                xaType = pair.getValue();
+            } else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) {
+                xaXid = pair.getValue();
+            }
+        }
+
+        if (xaType != null && xaXid != null) {
+            logger.info(" ------> " + xaType + " " + xaXid);
+        }
+    }
+
     public void setConnector(CanalConnector connector) {
         this.connector = connector;
     }

+ 5 - 4
meta/src/main/java/com/alibaba/otter/canal/meta/FileMixedMetaManager.java

@@ -109,10 +109,11 @@ public class FileMixedMetaManager extends MemoryMetaManager implements CanalMeta
                         // 定时将内存中的最新值刷到file中,多次变更只刷一次
                         if (logger.isInfoEnabled()) {
                             LogPosition cursor = (LogPosition) getCursor(clientIdentity);
-                            logger.info("clientId:{} cursor:[{},{},{}] address[{}]",
-                                new Object[] { clientIdentity.getClientId(), cursor.getPostion().getJournalName(),
-                                        cursor.getPostion().getPosition(), cursor.getPostion().getTimestamp(),
-                                        cursor.getIdentity().getSourceAddress().toString() });
+                            logger.info("clientId:{} cursor:[{},{},{},{},{}] address[{}]", new Object[] {
+                                    clientIdentity.getClientId(), cursor.getPostion().getJournalName(),
+                                    cursor.getPostion().getPosition(), cursor.getPostion().getTimestamp(),
+                                    cursor.getPostion().getServerId(), cursor.getPostion().getGtid(),
+                                    cursor.getIdentity().getSourceAddress().toString() });
                         }
                         flushDataToFile(clientIdentity.getDestination());
                         updateCursorTasks.remove(clientIdentity);

+ 2 - 1
meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java

@@ -83,7 +83,8 @@ public class MemoryMetaManager extends AbstractCanalLifeCycle implements CanalMe
     }
 
     public synchronized List<ClientIdentity> listAllSubscribeInfo(String destination) throws CanalMetaManagerException {
-        return destinations.get(destination);
+        // fixed issue #657, fixed ConcurrentModificationException
+        return Lists.newArrayList(destinations.get(destination));
     }
 
     public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {

+ 1 - 12
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -8,8 +8,6 @@ import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
-import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.math.RandomUtils;
@@ -21,6 +19,7 @@ import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.common.alarm.CanalAlarmHandler;
 import com.alibaba.otter.canal.filter.CanalEventFilter;
 import com.alibaba.otter.canal.parse.CanalEventParser;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
 import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer.TransactionFlushCallback;
@@ -90,7 +89,6 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
     protected Throwable                              exception                  = null;
 
     protected boolean                                isGTIDMode                 = false; // 是否是GTID模式
-    protected String                                 GTIDSetStr                 = null;
 
     protected abstract BinlogParser buildParser();
 
@@ -550,13 +548,4 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
     public void setIsGTIDMode(boolean isGTIDMode) {
         this.isGTIDMode = isGTIDMode;
     }
-
-    public String getGTIDSetStr() {
-        return GTIDSetStr;
-    }
-
-    public void setGTIDSetStr(String GTIDSetStr) {
-        this.GTIDSetStr = GTIDSetStr;
-    }
-
 }

+ 13 - 17
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -1,15 +1,13 @@
 package com.alibaba.otter.canal.parse.inbound.mysql;
 
-import java.io.IOException;
 import java.nio.charset.Charset;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
-import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvertGTID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.filter.CanalEventFilter;
 import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
@@ -37,20 +35,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     protected boolean           useDruidDdlFilter       = true;
 
     protected BinlogParser buildParser() {
-        LogEventConvert convert;
-
-        if (isGTIDMode()) {
-            EntryPosition position;
-            try {
-                position = findStartPosition(null);
-            } catch (IOException e) {
-                throw new RuntimeException("findStartPosition failed.", e);
-            }
-            convert = new LogEventConvertGTID(MysqlGTIDSet.parse(position.getGtid()));
-        } else {
-            convert = new LogEventConvert();
-        }
-
+        LogEventConvert convert = new LogEventConvert();
         if (eventFilter != null && eventFilter instanceof AviaterRegexFilter) {
             convert.setNameFilter((AviaterRegexFilter) eventFilter);
         }
@@ -84,7 +69,18 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
      * @return
      */
     protected boolean processTableMeta(EntryPosition position) {
+        if (isGTIDMode()) {
+            if (binlogParser instanceof LogEventConvert) {
+                // 记录gtid
+                ((LogEventConvert) binlogParser).setGtidSet(MysqlGTIDSet.parse(position.getGtid()));
+            }
+        }
+
         if (tableMetaTSDB != null) {
+            if (position.getTimestamp() == null || position.getTimestamp() <= 0) {
+                throw new CanalParseException("use gtid and TableMeta TSDB should be config timestamp > 0");
+            }
+
             return tableMetaTSDB.rollback(position);
         }
 

+ 5 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -10,10 +10,6 @@ import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
-import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
-import com.alibaba.otter.canal.parse.inbound.BinlogParser;
-import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvertGTID;
 import org.apache.commons.lang.StringUtils;
 import org.springframework.util.CollectionUtils;
 
@@ -354,8 +350,12 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
             if (logPosition != null) {
                 return logPosition.getPostion();
             }
-            return masterPosition;
+
+            if (StringUtils.isNotEmpty(masterPosition.getGtid())) {
+                return masterPosition;
+            }
         }
+
         EntryPosition startPosition = findStartPositionInternal(connection);
         if (needTransactionPosition.get()) {
             logger.warn("prepare to find last position : {}", startPosition.toString());

+ 84 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
@@ -40,6 +41,7 @@ import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.google.protobuf.ByteString;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.IntvarLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.LogHeader;
 import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
@@ -56,7 +58,6 @@ import com.taobao.tddl.dbsync.binlog.event.UserVarLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
-import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
 
 /**
  * 基于{@linkplain LogEvent}转化为Entry对象的处理
@@ -66,6 +67,12 @@ import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
  */
 public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogParser<LogEvent> {
 
+    public static final String          XA_XID              = "XA_XID";
+    public static final String          XA_TYPE             = "XA_TYPE";
+    public static final String          XA_START            = "XA START";
+    public static final String          XA_END              = "XA END";
+    public static final String          XA_COMMIT           = "XA COMMIT";
+    public static final String          XA_ROLLBACK         = "XA ROLLBACK";
     public static final String          ISO_8859_1          = "ISO-8859-1";
     public static final String          UTF_8               = "UTF-8";
     public static final int             TINYINT_MAX_VALUE   = 256;
@@ -93,6 +100,17 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     private boolean                     filterRows          = false;
     private boolean                     useDruidDdlFilter   = true;
 
+    // latest gtid
+    private GTIDSet                     gtidSet;
+
+    public LogEventConvert(GTIDSet gtidSet){
+        this.gtidSet = gtidSet;
+    }
+
+    public LogEventConvert(){
+
+    }
+
     @Override
     public Entry parse(LogEvent logEvent, boolean isSeek) throws CanalParseException {
         if (logEvent == null || logEvent instanceof UnknownLogEvent) {
@@ -148,16 +166,57 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
     private Entry parseGTIDLogEvent(GtidLogEvent logEvent) {
         LogHeader logHeader = logEvent.getHeader();
-        Header header = createHeader("", logHeader, "", "", EventType.GTID);
+        String value = logEvent.getSid().toString() + ":" + logEvent.getGno();
         Pair.Builder builder = Pair.newBuilder();
         builder.setKey("gtid");
-        builder.setValue(String.format("%s:%d", logEvent.getSid(), logEvent.getGno()));
+        builder.setValue(value);
+        if (gtidSet != null) {
+            gtidSet.update(value);
+        }
+
+        Header header = createHeader("", logHeader, "", "", EventType.GTID);
         return createEntry(header, EntryType.GTIDLOG, builder.build().toByteString());
     }
 
     private Entry parseQueryEvent(QueryLogEvent event, boolean isSeek) {
         String queryString = event.getQuery();
-        if (StringUtils.endsWithIgnoreCase(queryString, BEGIN)) {
+        if (StringUtils.startsWithIgnoreCase(queryString, XA_START)) {
+            // xa start use TransactionBegin
+            TransactionBegin.Builder beginBuilder = TransactionBegin.newBuilder();
+            beginBuilder.setThreadId(event.getSessionId());
+            beginBuilder.addProps(createSpecialPair(XA_TYPE, XA_START));
+            beginBuilder.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_START)));
+            TransactionBegin transactionBegin = beginBuilder.build();
+            Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
+            return createEntry(header, EntryType.TRANSACTIONBEGIN, transactionBegin.toByteString());
+        } else if (StringUtils.startsWithIgnoreCase(queryString, XA_END)) {
+            // xa start use TransactionEnd
+            TransactionEnd.Builder endBuilder = TransactionEnd.newBuilder();
+            endBuilder.setTransactionId(String.valueOf(0L));
+            endBuilder.addProps(createSpecialPair(XA_TYPE, XA_END));
+            endBuilder.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_END)));
+            TransactionEnd transactionEnd = endBuilder.build();
+            Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
+            return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
+        } else if (StringUtils.startsWithIgnoreCase(queryString, XA_COMMIT)) {
+            // xa commit
+            Header header = createHeader(binlogFileName, event.getHeader(), "", "", EventType.XACOMMIT);
+            RowChange.Builder rowChangeBuider = RowChange.newBuilder();
+            rowChangeBuider.setSql(queryString);
+            rowChangeBuider.addProps(createSpecialPair(XA_TYPE, XA_COMMIT));
+            rowChangeBuider.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_COMMIT)));
+            rowChangeBuider.setEventType(EventType.XACOMMIT);
+            return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
+        } else if (StringUtils.startsWithIgnoreCase(queryString, XA_ROLLBACK)) {
+            // xa rollback
+            Header header = createHeader(binlogFileName, event.getHeader(), "", "", EventType.XAROLLBACK);
+            RowChange.Builder rowChangeBuider = RowChange.newBuilder();
+            rowChangeBuider.setSql(queryString);
+            rowChangeBuider.addProps(createSpecialPair(XA_TYPE, XA_ROLLBACK));
+            rowChangeBuider.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_ROLLBACK)));
+            rowChangeBuider.setEventType(EventType.XAROLLBACK);
+            return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
+        } else if (StringUtils.endsWithIgnoreCase(queryString, BEGIN)) {
             TransactionBegin transactionBegin = createTransactionBegin(event.getSessionId());
             Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
             return createEntry(header, EntryType.TRANSACTIONBEGIN, transactionBegin.toByteString());
@@ -220,6 +279,10 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         }
     }
 
+    private String getXaXid(String queryString, String type) {
+        return StringUtils.substringAfter(queryString, type);
+    }
+
     private boolean processFilter(String queryString, DdlResult result) {
         String schemaName = result.getSchemaName();
         String tableName = result.getTableName();
@@ -404,7 +467,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 // 处理alisql模式的test.heartbeat心跳数据
                 // 心跳表基本无权限,需要mock一个tableMeta
                 FieldMeta idMeta = new FieldMeta("id", "smallint(6)", false, true, null);
-                FieldMeta typeMeta = new FieldMeta("type", "int(11)", true, false, null);
+                FieldMeta typeMeta = new FieldMeta("ts", "int(11)", true, false, null);
                 tableMeta = new TableMeta(table.getDbName(), table.getTableName(), Arrays.asList(idMeta, typeMeta));
             }
 
@@ -571,11 +634,14 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             // fixed issue
             // https://github.com/alibaba/canal/issues/66,特殊处理binary/varbinary,不能做编码处理
             boolean isBinary = false;
+            boolean isSingleBit = false;
             if (fieldMeta != null) {
                 if (StringUtils.containsIgnoreCase(fieldMeta.getColumnType(), "VARBINARY")) {
                     isBinary = true;
                 } else if (StringUtils.containsIgnoreCase(fieldMeta.getColumnType(), "BINARY")) {
                     isBinary = true;
+                } else if (StringUtils.containsIgnoreCase(fieldMeta.getColumnType(), "tinyint(1)")) {
+                    isSingleBit = true;
                 }
             }
             buffer.nextValue(info.type, info.meta, isBinary);
@@ -585,6 +651,9 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             }
 
             int javaType = buffer.getJavaType();
+            if (isSingleBit && javaType == Types.TINYINT) {
+                javaType = Types.BIT;
+            }
             if (buffer.isNull()) {
                 columnBuilder.setIsNull(true);
             } else {
@@ -737,6 +806,11 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             headerBuilder.setTableName(tableName);
         }
         headerBuilder.setEventLength(logHeader.getEventLen());
+        // enable gtid position
+        if (gtidSet != null) {
+            String gtid = gtidSet.toString();
+            headerBuilder.setGtid(gtid);
+        }
         return headerBuilder.build();
     }
 
@@ -785,7 +859,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         return "LONGTEXT".equalsIgnoreCase(columnType) || "MEDIUMTEXT".equalsIgnoreCase(columnType)
                || "TEXT".equalsIgnoreCase(columnType) || "TINYTEXT".equalsIgnoreCase(columnType);
     }
-    
+
     private boolean isAliSQLHeartBeat(String schema, String table) {
         return "test".equalsIgnoreCase(schema) && "heartbeat".equalsIgnoreCase(table);
     }
@@ -857,4 +931,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         this.filterRows = filterRows;
     }
 
+    public void setGtidSet(GTIDSet gtidSet) {
+        this.gtidSet = gtidSet;
+    }
+
 }

+ 0 - 55
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvertGTID.java

@@ -1,55 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.dbsync;
-
-import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
-import com.alibaba.otter.canal.parse.exception.CanalParseException;
-import com.alibaba.otter.canal.parse.inbound.BinlogParser;
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.taobao.tddl.dbsync.binlog.LogEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by hiwjd on 2018/5/2.
- * hiwjd0@gmail.com
- */
-public class LogEventConvertGTID extends LogEventConvert implements BinlogParser<LogEvent> {
-
-    public static final Logger logger = LoggerFactory.getLogger(LogEventConvertGTID.class);
-
-    // latest gtid
-    private GTIDSet gtidSet;
-
-    public LogEventConvertGTID(GTIDSet gtidSet) {
-        this.gtidSet = gtidSet;
-    }
-
-    @Override
-    public CanalEntry.Entry parse(LogEvent event, boolean isSeek) throws CanalParseException {
-        CanalEntry.Entry entry = super.parse(event, isSeek);
-        if (entry == null) {
-            return null;
-        }
-
-        if (entry.getEntryType() == CanalEntry.EntryType.GTIDLOG) {
-            try {
-                CanalEntry.Pair pair = CanalEntry.Pair.parseFrom(entry.getStoreValue());
-                gtidSet.update(pair.getValue());
-
-            } catch (InvalidProtocolBufferException e) {
-                logger.error("retrive gtid failed.", e);
-                throw new CanalParseException("retrive gtid failed.", e);
-            }
-
-            return null;
-        }
-
-        if (gtidSet != null) {
-            String gtid = gtidSet.toString();
-            CanalEntry.Header header = entry.getHeader().toBuilder().setGtid(gtid).build();
-            entry = entry.toBuilder().setHeader(header).build();
-        }
-
-        return entry;
-    }
-}

+ 3 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java

@@ -16,6 +16,7 @@ import com.alibaba.fastsql.sql.ast.SQLExpr;
 import com.alibaba.fastsql.sql.ast.SQLStatement;
 import com.alibaba.fastsql.sql.ast.expr.SQLCharExpr;
 import com.alibaba.fastsql.sql.ast.expr.SQLIdentifierExpr;
+import com.alibaba.fastsql.sql.ast.expr.SQLMethodInvokeExpr;
 import com.alibaba.fastsql.sql.ast.expr.SQLNullExpr;
 import com.alibaba.fastsql.sql.ast.expr.SQLPropertyExpr;
 import com.alibaba.fastsql.sql.ast.statement.SQLColumnConstraint;
@@ -243,6 +244,8 @@ public class MemoryTableMeta implements TableMetaTSDB {
             return DruidDdlParser.unescapeName(((SQLIdentifierExpr) sqlName).getName());
         } else if (sqlName instanceof SQLCharExpr) {
             return ((SQLCharExpr) sqlName).getText();
+        } else if (sqlName instanceof SQLMethodInvokeExpr) {
+            return DruidDdlParser.unescapeName(((SQLMethodInvokeExpr) sqlName).getMethodName());
         } else {
             return sqlName.toString();
         }

+ 27 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlDumpTest.java

@@ -4,6 +4,7 @@ import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.util.List;
 
+import org.apache.commons.lang.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -16,6 +17,7 @@ import com.alibaba.otter.canal.protocol.CanalEntry.Column;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
 import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
+import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
 import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
@@ -27,8 +29,8 @@ public class MysqlDumpTest {
     @Test
     public void testSimple() {
         final MysqlEventParser controller = new MysqlEventParser();
-        final EntryPosition startPosition = new EntryPosition("mysql-bin.000001", 104606L);
-
+        final EntryPosition startPosition = new EntryPosition("mysql-bin.000012", 34051L, 100L);
+        // startPosition.setGtid("f1ceb61a-a5d5-11e7-bdee-107c3dbcf8a7:1-17");
         controller.setConnectionCharset(Charset.forName("UTF-8"));
         controller.setSlaveId(3344L);
         controller.setDetectingEnable(false);
@@ -39,6 +41,7 @@ public class MysqlDumpTest {
         controller.setTsdbSpringXml("classpath:tsdb/h2-tsdb.xml");
         controller.setEventFilter(new AviaterRegexFilter("test\\..*"));
         controller.setEventBlackFilter(new AviaterRegexFilter("canal_tsdb\\..*"));
+        controller.setIsGTIDMode(false);
         controller.setEventSink(new AbstractCanalEventSinkTest<List<Entry>>() {
 
             public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)
@@ -72,6 +75,7 @@ public class MysqlDumpTest {
                         System.out.println(" sql ----> " + rowChage.getSql());
                     }
 
+                    printXAInfo(rowChage.getPropsList());
                     for (RowData rowData : rowChage.getRowDatasList()) {
                         if (eventType == EventType.DELETE) {
                             print(rowData.getBeforeColumnsList());
@@ -118,4 +122,25 @@ public class MysqlDumpTest {
             System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
         }
     }
+
+    private void printXAInfo(List<Pair> pairs) {
+        if (pairs == null) {
+            return;
+        }
+
+        String xaType = null;
+        String xaXid = null;
+        for (Pair pair : pairs) {
+            String key = pair.getKey();
+            if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) {
+                xaType = pair.getValue();
+            } else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) {
+                xaXid = pair.getValue();
+            }
+        }
+
+        if (xaType != null && xaXid != null) {
+            System.out.println(" ------> " + xaType + " " + xaXid);
+        }
+    }
 }

+ 30 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/FastsqlSchemaTest.java

@@ -0,0 +1,30 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import com.alibaba.fastsql.sql.repository.SchemaObject;
+import com.alibaba.fastsql.sql.repository.SchemaRepository;
+import com.alibaba.fastsql.util.JdbcConstants;
+
+/**
+ * @author agapple 2018年6月7日 下午5:36:13
+ * @since 3.1.9
+ */
+public class FastsqlSchemaTest {
+
+    @Test
+    public void testSimple() throws FileNotFoundException, IOException {
+        SchemaRepository repository = new SchemaRepository(JdbcConstants.MYSQL);
+        String sql = "create table yushitai_test.card_record ( id bigint auto_increment) auto_increment=256 "
+                     + "; alter table yushitai_test.card_record add column customization_id bigint unsigned NOT NULL COMMENT 'TEST' ;"
+                     + "; rename table yushitai_test.card_record to yushitai_test._card_record_del;";
+        repository.console(sql);
+
+        repository.setDefaultSchema("yushitai_test");
+        SchemaObject table = repository.findTable("_card_record_del");
+        System.out.println(table.getStatement().toString());
+    }
+}

+ 40 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta_DDL_Test.java

@@ -0,0 +1,40 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.net.URL;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+
+/**
+ * @author agapple 2017年8月1日 下午7:15:54
+ */
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(locations = { "/tsdb/mysql-tsdb.xml" })
+public class MemoryTableMeta_DDL_Test {
+
+    @Test
+    public void test1() throws Throwable {
+        MemoryTableMeta memoryTableMeta = new MemoryTableMeta();
+        URL url = Thread.currentThread().getContextClassLoader().getResource("dummy.txt");
+        File dummyFile = new File(url.getFile());
+        File create = new File(dummyFile.getParent() + "/ddl", "ddl_test1.sql");
+        String sql = StringUtils.join(IOUtils.readLines(new FileInputStream(create)), "\n");
+        memoryTableMeta.apply(null, "test", sql, null);
+
+        TableMeta meta = memoryTableMeta.find("yushitai_test", "card_record");
+        System.out.println(meta);
+        Assert.assertNotNull(meta.getFieldMetaByName("customization_id"));
+
+        meta = memoryTableMeta.find("yushitai_test", "_card_record_gho");
+        Assert.assertNull(meta);
+    }
+}

+ 21 - 0
parse/src/test/resources/ddl/ddl_test1.sql

@@ -0,0 +1,21 @@
+create table yushitai_test.card_record (
+id bigint auto_increment,
+last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+hint varchar(64) charset ascii not null,
+value varchar(255) charset ascii not null,
+primary key(id),
+unique key hint_uidx(hint)
+) auto_increment=256 ;
+
+DROP TABLE IF EXISTS _card_record_gho /* generated by server */ ;
+DROP TABLE IF EXISTS _card_record_del /* generated by server */ ;
+
+create /* gh-ost */ table yushitai_test._card_record_gho like yushitai_test.card_record ;
+alter /* gh-ost */ table yushitai_test._card_record_gho add column customization_id bigint unsigned NOT NULL COMMENT 'TEST' ;
+
+create /* gh-ost */ table yushitai_test._card_record_del (
+id int auto_increment primary key
+) engine=InnoDB comment='ghost-cut-over-sentry' ;
+
+DROP TABLE IF EXISTS _card_record_del /* generated by server */ ;
+rename /* gh-ost */ table yushitai_test.card_record to yushitai_test._card_record_del, yushitai_test._card_record_gho to yushitai_test.card_record;

+ 1 - 1
pom.xml

@@ -254,7 +254,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_186</version>
+                <version>2.0.0_preview_371</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>

Tiedoston diff-näkymää rajattu, sillä se on liian suuri
+ 603 - 507
protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalEntry.java


+ 3 - 0
protocol/src/main/java/com/alibaba/otter/canal/protocol/EntryProtocol.proto

@@ -191,6 +191,9 @@ enum EventType {
     CINDEX		= 		10;
     DINDEX 		= 		11;
     GTID        =       12;
+    /** XA **/
+    XACOMMIT    =       13;
+    XAROLLBACK  =		14;
 }
 
 /**数据库类型**/

Kaikkia tiedostoja ei voida näyttää, sillä liian monta tiedostoa muuttui tässä diffissä