Browse Source

fix bug: 并行解析模式下,开启gtid 会导致解析错误

winger 6 years ago
parent
commit
c1f92858a2

+ 5 - 0
dbsync/pom.xml

@@ -11,6 +11,11 @@
 	<packaging>jar</packaging>
 	<name>canal dbsync module for otter ${project.version}</name>
 	<dependencies>
+		<dependency>
+			<groupId>com.alibaba.otter</groupId>
+			<artifactId>canal.parse.driver</artifactId>
+			<version>${project.version}</version>
+		</dependency>
 		<!-- log -->
 		<dependency>
 			<groupId>ch.qos.logback</groupId>

+ 20 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogContext.java

@@ -3,7 +3,10 @@ package com.taobao.tddl.dbsync.binlog;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
 
 /**
@@ -20,6 +23,8 @@ public final class LogContext {
 
     private LogPosition                       logPosition;
 
+    private GTIDSet                           gtidSet;
+
     public LogContext(){
         this.formatDescription = FormatDescriptionLogEvent.FORMAT_DESCRIPTION_EVENT_5_x;
     }
@@ -60,4 +65,19 @@ public final class LogContext {
         formatDescription = FormatDescriptionLogEvent.FORMAT_DESCRIPTION_EVENT_5_x;
         mapOfTable.clear();
     }
+
+    public final void putGtid(GtidLogEvent logEvent) {
+        if (logEvent != null) {
+            String gtid = logEvent.getSid().toString() + ":" + logEvent.getGno();
+            if (gtidSet == null) {
+                gtid = logEvent.getSid().toString() + ":1-" + logEvent.getGno();
+                gtidSet = MysqlGTIDSet.parse(gtid);
+            }
+            gtidSet.update(gtid);
+        }
+    }
+
+    public GTIDSet getGtidSet() {
+        return gtidSet;
+    }
 }

+ 13 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -3,6 +3,7 @@ package com.taobao.tddl.dbsync.binlog;
 import java.io.IOException;
 import java.util.BitSet;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -159,18 +160,19 @@ public final class LogDecoder {
             // remove checksum bytes
             buffer.limit(header.getEventLen() - LogEvent.BINLOG_CHECKSUM_LEN);
         }
-
         switch (header.getType()) {
             case LogEvent.QUERY_EVENT: {
                 QueryLogEvent event = new QueryLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
+                header.putGtidStr(context.getGtidSet());
                 return event;
             }
             case LogEvent.XID_EVENT: {
                 XidLogEvent event = new XidLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
+                header.putGtidStr(context.getGtidSet());
                 return event;
             }
             case LogEvent.TABLE_MAP_EVENT: {
@@ -268,12 +270,14 @@ public final class LogDecoder {
                 RandLogEvent event = new RandLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
+                header.putGtidStr(context.getGtidSet());
                 return event;
             }
             case LogEvent.USER_VAR_EVENT: {
                 UserVarLogEvent event = new UserVarLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
+                header.putGtidStr(context.getGtidSet());
                 return event;
             }
             case LogEvent.FORMAT_DESCRIPTION_EVENT: {
@@ -324,6 +328,7 @@ public final class LogDecoder {
                 HeartbeatLogEvent event = new HeartbeatLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
+                header.putGtidStr(context.getGtidSet());
                 return event;
             }
             case LogEvent.IGNORABLE_LOG_EVENT: {
@@ -336,6 +341,7 @@ public final class LogDecoder {
                 RowsQueryLogEvent event = new RowsQueryLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
+                header.putGtidStr(context.getGtidSet());
                 return event;
             }
             case LogEvent.WRITE_ROWS_EVENT: {
@@ -343,6 +349,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
+                header.putGtidStr(context.getGtidSet());
                 return event;
             }
             case LogEvent.UPDATE_ROWS_EVENT: {
@@ -350,6 +357,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
+                header.putGtidStr(context.getGtidSet());
                 return event;
             }
             case LogEvent.DELETE_ROWS_EVENT: {
@@ -357,6 +365,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
+                header.putGtidStr(context.getGtidSet());
                 return event;
             }
             case LogEvent.GTID_LOG_EVENT:
@@ -364,6 +373,8 @@ public final class LogDecoder {
                 GtidLogEvent event = new GtidLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
+                // update latest gtid
+                context.putGtid(event);
                 return event;
             }
             case LogEvent.PREVIOUS_GTIDS_LOG_EVENT: {
@@ -394,6 +405,7 @@ public final class LogDecoder {
                 AnnotateRowsEvent event = new AnnotateRowsEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
+                header.putGtidStr(context.getGtidSet());
                 return event;
             }
             case LogEvent.BINLOG_CHECKPOINT_EVENT: {

+ 13 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/LogHeader.java

@@ -1,5 +1,6 @@
 package com.taobao.tddl.dbsync.binlog.event;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 
@@ -120,6 +121,8 @@ public final class LogHeader {
      */
     protected String    logFileName;
 
+    protected String    gtidStr;
+
     /* for Start_event_v3 */
     public LogHeader(final int type){
         this.type = type;
@@ -288,4 +291,14 @@ public final class LogHeader {
             crc = buffer.getUint32(eventLen - LogEvent.BINLOG_CHECKSUM_LEN);
         }
     }
+
+    public String getGtidStr() {
+        return gtidStr;
+    }
+
+    public void putGtidStr(GTIDSet gtidSet) {
+        if (gtidSet != null) {
+            this.gtidStr = gtidSet.toString();
+        }
+    }
 }

+ 4 - 7
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -170,15 +170,13 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         return entryBuilder.build();
     }
 
-    private Entry parseGTIDLogEvent(GtidLogEvent logEvent) {
+    public Entry parseGTIDLogEvent(GtidLogEvent logEvent) {
         LogHeader logHeader = logEvent.getHeader();
         String value = logEvent.getSid().toString() + ":" + logEvent.getGno();
         Pair.Builder builder = Pair.newBuilder();
         builder.setKey("gtid");
         builder.setValue(value);
-        if (gtidSet != null) {
-            gtidSet.update(value);
-        }
+
         if (logEvent.getLastCommitted() != null) {
             builder.setKey("lastCommitted");
             builder.setValue(String.valueOf(logEvent.getLastCommitted()));
@@ -845,9 +843,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         }
         headerBuilder.setEventLength(logHeader.getEventLen());
         // enable gtid position
-        if (gtidSet != null) {
-            String gtid = gtidSet.toString();
-            headerBuilder.setGtid(gtid);
+        if (StringUtils.isNotEmpty(logHeader.getGtidStr())) {
+            headerBuilder.setGtid(logHeader.getGtidStr());
         }
 
         // add rowsCount suppport