Browse Source

append gtid lastCommitted and sequenceNumber support

winger 7 năm trước cách đây
mục cha
commit
89726a6365

+ 19 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/GtidLogEvent.java

@@ -17,10 +17,13 @@ public class GtidLogEvent extends LogEvent {
     public static final int ENCODED_FLAG_LENGTH = 1;
     // / Length of SID in event encoding
     public static final int ENCODED_SID_LENGTH  = 16;
+    public static final int LOGICAL_TIMESTAMP_TYPE_CODE  = 2;
 
     private boolean         commitFlag;
     private UUID            sid;
     private long            gno;
+    private Long            lastCommitted;
+    private Long            sequenceNumber;
 
     public GtidLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
         super(header);
@@ -40,6 +43,14 @@ public class GtidLogEvent extends LogEvent {
 
         gno = buffer.getLong64();
 
+        // support gtid lastCommitted and sequenceNumber
+        // 42 = 1+16+8+1+8+8
+        if (buffer.capacity() > 42 && buffer.getUint8() == LOGICAL_TIMESTAMP_TYPE_CODE) {
+            lastCommitted = buffer.getLong64();
+            sequenceNumber = buffer.getLong64();
+        }
+
+
         // ignore gtid info read
         // sid.copy_from((uchar *)ptr_buffer);
         // ptr_buffer+= ENCODED_SID_LENGTH;
@@ -62,4 +73,12 @@ public class GtidLogEvent extends LogEvent {
     public long getGno() {
         return gno;
     }
+
+    public Long getLastCommitted() {
+        return lastCommitted;
+    }
+
+    public Long getSequenceNumber() {
+        return sequenceNumber;
+    }
 }

+ 6 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -167,6 +167,12 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         if (gtidSet != null) {
             gtidSet.update(value);
         }
+        if (logEvent.getLastCommitted() != null) {
+            builder.setKey("lastCommitted");
+            builder.setValue(String.valueOf(logEvent.getLastCommitted()));
+            builder.setKey("sequenceNumber");
+            builder.setValue(String.valueOf(logEvent.getSequenceNumber()));
+        }
 
         Header header = createHeader(logHeader, "", "", EventType.GTID);
         return createEntry(header, EntryType.GTIDLOG, builder.build().toByteString());