Bläddra i källkod

fixed issue #4225 , support mysql version >= 8.0.26 heartbeat v2

jianghang.loujh 2 år sedan
förälder
incheckning
7585d9c98b

+ 6 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -418,6 +418,12 @@ public final class LogDecoder {
                 logPosition.position = header.getLogPos();
                 return event;
             }
+            case LogEvent.HEARTBEAT_LOG_EVENT_V2: {
+                HeartbeatV2LogEvent event = new HeartbeatV2LogEvent(header, buffer, descriptionEvent);
+                /* updating position in context */
+                logPosition.position = header.getLogPos();
+                return event;
+            }
             default:
                 /*
                  * Create an object of Ignorable_log_event for unrecognized

+ 5 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogEvent.java

@@ -178,7 +178,10 @@ public abstract class LogEvent {
     /* mysql 8.0.20 */
     public static final int    TRANSACTION_PAYLOAD_EVENT                = 40;
 
-    public static final int    MYSQL_ENUM_END_EVENT                     = 41;
+    /* mysql 8.0.26 */
+    public static final int    HEARTBEAT_LOG_EVENT_V2                   = 41;
+
+    public static final int    MYSQL_ENUM_END_EVENT                     = 42;
 
     // mariaDb 5.5.34
     /* New MySQL/Sun events are to be added right above this comment */
@@ -358,6 +361,7 @@ public abstract class LogEvent {
             case INCIDENT_EVENT:
                 return "Incident";
             case HEARTBEAT_LOG_EVENT:
+            case HEARTBEAT_LOG_EVENT_V2:
                 return "Heartbeat";
             case IGNORABLE_LOG_EVENT:
                 return "Ignorable";

+ 40 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/HeartbeatV2LogEvent.java

@@ -0,0 +1,40 @@
+package com.taobao.tddl.dbsync.binlog.event;
+
+import com.taobao.tddl.dbsync.binlog.LogBuffer;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
+
+/**
+ * <pre>
+ *   Replication event to ensure to replica that source is alive.
+ *   The event is originated by source's dump thread and sent straight to
+ *   replica without being logged. Slave itself does not store it in relay log
+ *   but rather uses a data for immediate checks and throws away the event.
+ *   Two members of the class m_log_filename and m_log_position comprise
+ *   @see the rpl_event_coordinates instance. The coordinates that a heartbeat
+ *   instance carries correspond to the last event source has sent from
+ *   its binlog.
+ *   Also this event will be generated only for the source server with
+ *   version > 8.0.26
+ * </pre>
+ * 
+ * @author jianghang 2022-09-01 下午16:36:29
+ * @version 1.1.6
+ * @since mysql 8.0.26
+ */
+public class HeartbeatV2LogEvent extends LogEvent {
+
+    private byte[]          payload;
+
+    public HeartbeatV2LogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
+        super(header);
+
+        final int commonHeaderLen = descriptionEvent.commonHeaderLen;
+        int payloadLenth = buffer.limit() - commonHeaderLen;
+        // see : https://github.com/mysql/mysql-server/commit/59e590738a772b74ad50c4a57c86aaa1bc6501c7#diff-184e9a7d8a58f080974e475d4199fe5c6da5518c8a2811cc5df5988c8f9e9797
+        payload = buffer.getData(payloadLenth);
+    }
+
+    public byte[] getPayload() {
+        return payload;
+    }
+}

+ 12 - 16
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -12,6 +12,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.taobao.tddl.dbsync.binlog.event.*;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -40,23 +41,7 @@ import com.alibaba.otter.canal.protocol.CanalEntry.Type;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.google.protobuf.ByteString;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
-import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.HeartbeatLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.IntvarLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.LogHeader;
-import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RandLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer;
-import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent.ColumnInfo;
-import com.taobao.tddl.dbsync.binlog.event.UnknownLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.UserVarLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidListLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidLogEvent;
@@ -147,6 +132,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 return parseGTIDLogEvent((GtidLogEvent) logEvent);
             case LogEvent.HEARTBEAT_LOG_EVENT:
                 return parseHeartbeatLogEvent((HeartbeatLogEvent) logEvent);
+            case LogEvent.HEARTBEAT_LOG_EVENT_V2:
+                return parseHeartbeatV2LogEvent((HeartbeatV2LogEvent) logEvent);
             case LogEvent.GTID_EVENT:
             case LogEvent.GTID_LIST_EVENT:
                 return parseMariaGTIDLogEvent(logEvent);
@@ -173,6 +160,15 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         return entryBuilder.build();
     }
 
+    private Entry parseHeartbeatV2LogEvent(HeartbeatV2LogEvent logEvent) {
+        Header.Builder headerBuilder = Header.newBuilder();
+        headerBuilder.setEventType(EventType.MHEARTBEAT);
+        Entry.Builder entryBuilder = Entry.newBuilder();
+        entryBuilder.setHeader(headerBuilder.build());
+        entryBuilder.setEntryType(EntryType.HEARTBEAT);
+        return entryBuilder.build();
+    }
+
     private Entry parseGTIDLogEvent(GtidLogEvent logEvent) {
         LogHeader logHeader = logEvent.getHeader();
         Pair.Builder builder = Pair.newBuilder();