Forráskód Böngészése

support mysql 8.0 transaction_payload_event

jianghang.loujh 3 éve
szülő
commit
99aefaa118

+ 1 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -11,7 +11,7 @@ canal.conf:
   flatMessage: true
   zookeeperHosts:
   syncBatchSize: 1000
-  retries: 0
+  retries: -1
   timeout:
   accessKey:
   secretKey:

+ 8 - 33
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -3,42 +3,11 @@ package com.taobao.tddl.dbsync.binlog;
 import java.io.IOException;
 import java.util.BitSet;
 
+import com.taobao.tddl.dbsync.binlog.event.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
-import com.taobao.tddl.dbsync.binlog.event.AppendBlockLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.BeginLoadQueryLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.CreateFileLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.DeleteFileLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.ExecuteLoadLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.ExecuteLoadQueryLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.HeartbeatLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.IgnorableLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.IncidentLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.IntvarLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.LoadLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.LogHeader;
-import com.taobao.tddl.dbsync.binlog.event.PreviousGtidsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RandLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.StartLogEventV3;
-import com.taobao.tddl.dbsync.binlog.event.StopLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.TransactionContextLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.UnknownLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.UserVarLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.ViewChangeEvent;
-import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.XaPrepareLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.BinlogCheckPointLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidListLogEvent;
@@ -386,6 +355,12 @@ public final class LogDecoder {
                 logPosition.position = header.getLogPos();
                 return event;
             }
+            case LogEvent.TRANSACTION_PAYLOAD_EVENT: {
+                TransactionPayloadLogEvent event = new TransactionPayloadLogEvent(header, buffer, descriptionEvent);
+                /* updating position in context */
+                logPosition.position = header.getLogPos();
+                return event;
+            }
             case LogEvent.VIEW_CHANGE_EVENT: {
                 ViewChangeEvent event = new ViewChangeEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
@@ -443,7 +418,7 @@ public final class LogDecoder {
                 logPosition.position = header.getLogPos();
                 return event;
             }
-            default:
+            com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService.lambda    default:
                 /*
                  * Create an object of Ignorable_log_event for unrecognized
                  * sub-class. So that SLAVE SQL THREAD will only update the

+ 14 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogEvent.java

@@ -175,6 +175,11 @@ public abstract class LogEvent {
      */
     public static final int    PARTIAL_UPDATE_ROWS_EVENT                = 39;
 
+    /* mysql 8.0.20 */
+    public static final int    TRANSACTION_PAYLOAD_EVENT                = 40;
+
+    public static final int    MYSQL_ENUM_END_EVENT                     = 41;
+
     // mariaDb 5.5.34
     /* New MySQL/Sun events are to be added right above this comment */
     public static final int    MYSQL_EVENTS_END                         = 49;
@@ -372,8 +377,16 @@ public abstract class LogEvent {
                 return "Previous_gtids";
             case PARTIAL_UPDATE_ROWS_EVENT:
                 return "Update_rows_partial";
+            case TRANSACTION_CONTEXT_EVENT :
+                return "Transaction_context";
+            case VIEW_CHANGE_EVENT :
+                return "view_change";
+            case XA_PREPARE_LOG_EVENT :
+                return "Xa_prepare";
+            case TRANSACTION_PAYLOAD_EVENT :
+                return "transaction_payload";
             default:
-                return "Unknown"; /* impossible */
+                return "Unknown type:" + type;
         }
     }
 

+ 16 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/TransactionPayloadLogEvent.java

@@ -0,0 +1,16 @@
+package com.taobao.tddl.dbsync.binlog.event;
+
+import com.taobao.tddl.dbsync.binlog.LogBuffer;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
+
+/**
+ * @author agapple 2022年5月23日 下午7:05:39
+ * @version 1.1.6
+ * @since mysql 8.0.20
+ */
+public class TransactionPayloadLogEvent extends LogEvent {
+
+    public TransactionPayloadLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
+        super(header);
+    }
+}