Selaa lähdekoodia

将当前的gtid,sequence no 和last committed信息增加至Entry(trx begin/end, rowdata) Header 中,客户端可从property中获取相关值

winger 6 vuotta sitten
vanhempi
commit
1516cf779d

+ 11 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogContext.java

@@ -5,6 +5,7 @@ import java.util.Map;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
 
 /**
@@ -23,6 +24,8 @@ public final class LogContext {
 
     private GTIDSet                           gtidSet;
 
+    private GtidLogEvent                      gtidLogEvent; // save current gtid log event
+
     public LogContext(){
         this.formatDescription = FormatDescriptionLogEvent.FORMAT_DESCRIPTION_EVENT_5_x;
     }
@@ -71,4 +74,12 @@ public final class LogContext {
     public void setGtidSet(GTIDSet gtidSet) {
         this.gtidSet = gtidSet;
     }
+
+    public GtidLogEvent getGtidLogEvent() {
+        return gtidLogEvent;
+    }
+
+    public void setGtidLogEvent(GtidLogEvent gtidLogEvent) {
+        this.gtidLogEvent = gtidLogEvent;
+    }
 }

+ 13 - 11
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -161,19 +161,20 @@ public final class LogDecoder {
             buffer.limit(header.getEventLen() - LogEvent.BINLOG_CHECKSUM_LEN);
         }
         GTIDSet gtidSet = context.getGtidSet();
+        GtidLogEvent gtidLogEvent = context.getGtidLogEvent();
         switch (header.getType()) {
             case LogEvent.QUERY_EVENT: {
                 QueryLogEvent event = new QueryLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.XID_EVENT: {
                 XidLogEvent event = new XidLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.TABLE_MAP_EVENT: {
@@ -271,14 +272,14 @@ public final class LogDecoder {
                 RandLogEvent event = new RandLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.USER_VAR_EVENT: {
                 UserVarLogEvent event = new UserVarLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.FORMAT_DESCRIPTION_EVENT: {
@@ -329,7 +330,6 @@ public final class LogDecoder {
                 HeartbeatLogEvent event = new HeartbeatLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                header.putGtidStr(context.getGtidSet());
                 return event;
             }
             case LogEvent.IGNORABLE_LOG_EVENT: {
@@ -342,7 +342,7 @@ public final class LogDecoder {
                 RowsQueryLogEvent event = new RowsQueryLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.WRITE_ROWS_EVENT: {
@@ -350,7 +350,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.UPDATE_ROWS_EVENT: {
@@ -358,7 +358,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.DELETE_ROWS_EVENT: {
@@ -366,7 +366,7 @@ public final class LogDecoder {
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
                 event.fillTable(context);
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.GTID_LOG_EVENT:
@@ -377,8 +377,10 @@ public final class LogDecoder {
                 if (gtidSet != null) {
                     gtidSet.update(event.getGtidStr());
                     // update latest gtid
-                    header.putGtidStr(gtidSet);
+                    header.putGtid(gtidSet, event);
                 }
+                // update current gtid event to context
+                context.setGtidLogEvent(event);
                 return event;
             }
             case LogEvent.PREVIOUS_GTIDS_LOG_EVENT: {
@@ -409,7 +411,7 @@ public final class LogDecoder {
                 AnnotateRowsEvent event = new AnnotateRowsEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                header.putGtidStr(context.getGtidSet());
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
             case LogEvent.BINLOG_CHECKPOINT_EVENT: {

+ 30 - 5
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/LogHeader.java

@@ -4,6 +4,9 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * The Common-Header, documented in the table @ref Table_common_header "below",
  * always has the same form and length within one version of MySQL. Each event
@@ -121,7 +124,12 @@ public final class LogHeader {
      */
     protected String    logFileName;
 
-    protected String    gtidStr;
+    protected Map<String, String> gtidMap = new HashMap<>();
+
+    private static final String CURRENT_GTID_STRING = "curt_gtid";
+    private static final String CURRENT_GTID_SN = "curt_gtid_sn";
+    private static final String CURRENT_GTID_LAST_COMMIT = "curt_gtid_lct";
+    private static final String GTID_SET_STRING = "gtid_str";
 
     /* for Start_event_v3 */
     public LogHeader(final int type){
@@ -292,13 +300,30 @@ public final class LogHeader {
         }
     }
 
-    public String getGtidStr() {
-        return gtidStr;
+    public String getGtidSetStr() {
+        return gtidMap.get(GTID_SET_STRING);
     }
 
-    public void putGtidStr(GTIDSet gtidSet) {
+    public String getCurrentGtid() {
+        return gtidMap.get(CURRENT_GTID_STRING);
+    }
+
+    public String getCurrentGtidSn() {
+        return gtidMap.get(CURRENT_GTID_SN);
+    }
+
+    public String getCurrentGtidLastCommit() {
+        return gtidMap.get(CURRENT_GTID_LAST_COMMIT);
+    }
+
+    public void putGtid(GTIDSet gtidSet, GtidLogEvent event) {
         if (gtidSet != null) {
-            this.gtidStr = gtidSet.toString();
+            gtidMap.put(GTID_SET_STRING, gtidSet.toString());
+            if (event != null) {
+                gtidMap.put(CURRENT_GTID_STRING, event.getGtidStr());
+                gtidMap.put(CURRENT_GTID_SN, String.valueOf(event.getSequenceNumber()));
+                gtidMap.put(CURRENT_GTID_LAST_COMMIT, String.valueOf(event.getLastCommitted()));
+            }
         }
     }
 }

+ 18 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -545,7 +545,6 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 rowsCount++;
                 rowChangeBuider.addRowDatas(rowDataBuilder.build());
             }
-
             TableMapLogEvent table = event.getTable();
             Header header = createHeader(event.getHeader(),
                 table.getDbName(),
@@ -834,8 +833,24 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         }
         headerBuilder.setEventLength(logHeader.getEventLen());
         // enable gtid position
-        if (StringUtils.isNotEmpty(logHeader.getGtidStr())) {
-            headerBuilder.setGtid(logHeader.getGtidStr());
+        if (StringUtils.isNotEmpty(logHeader.getGtidSetStr())) {
+            headerBuilder.setGtid(logHeader.getGtidSetStr());
+        }
+        // add current gtid
+        if (StringUtils.isNotEmpty(logHeader.getCurrentGtid())) {
+            Pair pair = createSpecialPair("curtGtid", logHeader.getCurrentGtid());
+            headerBuilder.addProps(pair);
+        }
+        // add current gtid sequence no
+        if (StringUtils.isNotEmpty(logHeader.getCurrentGtidSn())) {
+            Pair pair = createSpecialPair("curtGtidSn", logHeader.getCurrentGtidSn());
+            headerBuilder.addProps(pair);
+        }
+
+        // add current gtid last committed
+        if (StringUtils.isNotEmpty(logHeader.getCurrentGtidLastCommit())) {
+            Pair pair = createSpecialPair("curtGtidLct", logHeader.getCurrentGtidLastCommit());
+            headerBuilder.addProps(pair);
         }
 
         // add rowsCount suppport