Răsfoiți Sursa

fixes issue #4388 , support mysql 8.0.20+ TransactionPayloadLogEvent parse

jianghang.loujh 1 an în urmă
părinte
comite
252ca0279f

+ 8 - 0
dbsync/pom.xml

@@ -33,6 +33,14 @@
 			<groupId>org.slf4j</groupId>
 			<artifactId>slf4j-api</artifactId>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-compress</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.github.luben</groupId>
+			<artifactId>zstd-jni</artifactId>
+		</dependency>
 		<!-- test dependency -->
 		<dependency>
 			<groupId>junit</groupId>

+ 16 - 8
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogBuffer.java

@@ -2,7 +2,6 @@ package com.taobao.tddl.dbsync.binlog;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.Charset;
@@ -30,7 +29,9 @@ public class LogBuffer {
     }
 
     public LogBuffer(byte[] buffer, final int origin, final int limit){
-        if (origin + limit > buffer.length) throw new IllegalArgumentException("capacity excceed: " + (origin + limit));
+        if (origin + limit > buffer.length) {
+            throw new IllegalArgumentException("capacity excceed: " + (origin + limit));
+        }
 
         this.buffer = buffer;
         this.origin = origin;
@@ -42,7 +43,9 @@ public class LogBuffer {
      * Return n bytes in this buffer.
      */
     public final LogBuffer duplicate(final int pos, final int len) {
-        if (pos + len > limit) throw new IllegalArgumentException("limit excceed: " + (pos + len));
+        if (pos + len > limit) {
+            throw new IllegalArgumentException("limit excceed: " + (pos + len));
+        }
 
         // XXX: Do momery copy avoid buffer modified.
         final int off = origin + pos;
@@ -54,8 +57,9 @@ public class LogBuffer {
      * Return next n bytes in this buffer.
      */
     public final LogBuffer duplicate(final int len) {
-        if (position + len > origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                                + (position + len - origin));
+        if (position + len > origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position + len - origin));
+        }
 
         // XXX: Do momery copy avoid buffer modified.
         final int end = position + len;
@@ -103,7 +107,9 @@ public class LogBuffer {
      * <tt>newPosition</tt> do not hold
      */
     public final LogBuffer position(final int newPosition) {
-        if (newPosition > limit || newPosition < 0) throw new IllegalArgumentException("limit excceed: " + newPosition);
+        if (newPosition > limit || newPosition < 0) {
+            throw new IllegalArgumentException("limit excceed: " + newPosition);
+        }
 
         this.position = origin + newPosition;
         return this;
@@ -116,8 +122,9 @@ public class LogBuffer {
      * @return This buffer
      */
     public final LogBuffer forward(final int len) {
-        if (position + len > origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                                + (position + len - origin));
+        if (position + len > origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position + len - origin));
+        }
 
         this.position += len;
         return this;
@@ -1675,4 +1682,5 @@ public class LogBuffer {
         }
         return "";
     }
+
 }

+ 10 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogContext.java

@@ -26,6 +26,8 @@ public final class LogContext {
 
     private LogEvent                          gtidLogEvent; // save current gtid log event
 
+    private boolean                           iterateDecode = false;
+
     public LogContext(){
         this.formatDescription = FormatDescriptionLogEvent.FORMAT_DESCRIPTION_EVENT_5_x;
     }
@@ -82,4 +84,12 @@ public final class LogContext {
     public void setGtidLogEvent(LogEvent gtidLogEvent) {
         this.gtidLogEvent = gtidLogEvent;
     }
+
+    public boolean isIterateDecode() {
+        return iterateDecode;
+    }
+
+    public void setIterateDecode(boolean iterateDecode) {
+        this.iterateDecode = iterateDecode;
+    }
 }

+ 71 - 19
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -1,18 +1,19 @@
 package com.taobao.tddl.dbsync.binlog;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.BitSet;
+import java.util.List;
 
-import com.taobao.tddl.dbsync.binlog.event.*;
+import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
-import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
-import com.taobao.tddl.dbsync.binlog.event.mariadb.BinlogCheckPointLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidListLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.mariadb.StartEncryptionLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.*;
+import com.taobao.tddl.dbsync.binlog.event.mariadb.*;
 
 /**
  * Implements a binary-log decoder.
@@ -56,6 +57,8 @@ public final class LogDecoder {
         handleSet.set(flagIndex);
     }
 
+    private LogBuffer compressIterateBuffer;
+
     /**
      * Decoding an event from binary-log buffer.
      *
@@ -64,7 +67,6 @@ public final class LogDecoder {
      */
     public LogEvent decode(LogBuffer buffer, LogContext context) throws IOException {
         final int limit = buffer.limit();
-
         if (limit >= FormatDescriptionLogEvent.LOG_EVENT_HEADER_LEN) {
             LogHeader header = new LogHeader(buffer, context.getFormatDescription());
 
@@ -109,6 +111,58 @@ public final class LogDecoder {
         return null;
     }
 
+    /**
+     * * process compress binlog payload
+     * 
+     * @param event
+     * @param context
+     * @return
+     * @throws IOException
+     */
+    public List<LogEvent> processIterateDecode(LogEvent event, LogContext context) throws IOException {
+        List<LogEvent> events = Lists.newArrayList();
+        if (event.getHeader().getType() == LogEvent.TRANSACTION_PAYLOAD_EVENT) {
+            // iterate for compresss payload
+            TransactionPayloadLogEvent compressEvent = ((TransactionPayloadLogEvent) event);
+            LogBuffer iterateBuffer = null;
+            if (compressEvent.isCompressByZstd()) {
+                ZstdCompressorInputStream in = new ZstdCompressorInputStream(
+                    new ByteArrayInputStream(compressEvent.getPayload()));
+                byte[] decodeBytes = IOUtils.toByteArray(in);
+                iterateBuffer = new LogBuffer(decodeBytes, 0, decodeBytes.length);
+            } else if (compressEvent.isCompressByNone()) {
+                iterateBuffer = new LogBuffer(compressEvent.getPayload(), 0, compressEvent.getPayload().length);
+            } else {
+                throw new IllegalArgumentException("unkonow compresstype for " + event.getHeader().getLogFileName()
+                                                   + ":" + event.getHeader().getLogPos());
+            }
+
+            try {
+                context.setIterateDecode(true);
+                while (iterateBuffer.hasRemaining()) {// iterate
+                    LogEvent deEvent = decode(iterateBuffer, context);
+                    if (deEvent == null) {
+                        break;
+                    }
+
+                    // compress event logPos = 0
+                    deEvent.getHeader().setLogFileName(event.getHeader().getLogFileName());
+                    deEvent.getHeader().setLogPos(event.getHeader().getLogPos());
+                    // 需要重置payload每个event的eventLen , ack位点更新依赖logPos - eventLen,
+                    // 原因:每个payload都是uncompress的eventLen,无法对应物理binlog的eventLen
+                    // 隐患:memory计算空间大小时会出现放大的情况,影响getBatch的数量
+                    deEvent.getHeader().setEventLen(event.getHeader().getEventLen());
+                    events.add(deEvent);
+                }
+            } finally {
+                context.setIterateDecode(false);
+            }
+        } else {
+            // TODO support mariadb compress binlog
+        }
+        return events;
+    }
+
     /**
      * Deserialize an event from buffer.
      *
@@ -127,8 +181,12 @@ public final class LogDecoder {
         }
 
         if (checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_OFF && checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_UNDEF) {
-            // remove checksum bytes
-            buffer.limit(header.getEventLen() - LogEvent.BINLOG_CHECKSUM_LEN);
+            if (context.isIterateDecode()) {
+                // transaction compress payload在主事件已经处理了checksum,遍历解析event忽略checksum处理
+            } else {
+                // remove checksum bytes
+                buffer.limit(header.getEventLen() - LogEvent.BINLOG_CHECKSUM_LEN);
+            }
         }
         GTIDSet gtidSet = context.getGtidSet();
         LogEvent gtidLogEvent = context.getGtidLogEvent();
@@ -360,16 +418,10 @@ public final class LogDecoder {
                 return event;
             }
             case LogEvent.TRANSACTION_PAYLOAD_EVENT: {
-                if (logger.isWarnEnabled()) {
-                    logger.warn("Skipping unsupported MySQL TRANSACTION_PAYLOAD_EVENT from: " + context.getLogPosition());
-                }
-                break;
-
-                // TransactionPayloadLogEvent event = new TransactionPayloadLogEvent(header,
-                // buffer, descriptionEvent);
-                // /* updating position in context */
-                // logPosition.position = header.getLogPos();
-                // return event;
+                TransactionPayloadLogEvent event = new TransactionPayloadLogEvent(header, buffer, descriptionEvent);
+                /* updating position in context */
+                logPosition.position = header.getLogPos();
+                return event;
             }
             case LogEvent.VIEW_CHANGE_EVENT: {
                 ViewChangeEvent event = new ViewChangeEvent(header, buffer, descriptionEvent);

+ 6 - 4
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/FormatDescriptionLogEvent.java

@@ -62,6 +62,7 @@ public final class FormatDescriptionLogEvent extends StartLogEventV3 {
     public static final int   TRANSACTION_CONTEXT_HEADER_LEN      = 18;
     public static final int   VIEW_CHANGE_HEADER_LEN              = 52;
     public static final int   XA_PREPARE_HEADER_LEN               = 0;
+    public static final int   TRANSACTION_PAYLOAD_HEADER_LEN      = 0;
 
     public static final int   ANNOTATE_ROWS_HEADER_LEN            = 0;
     public static final int   BINLOG_CHECKPOINT_HEADER_LEN        = 4;
@@ -113,14 +114,14 @@ public final class FormatDescriptionLogEvent extends StartLogEventV3 {
         long calc = getVersionProduct();
         if (calc >= checksumVersionProduct) {
             /*
-             * the last bytes are the checksum alg desc and value (or value's
-             * room)
+             * the last bytes are the checksum alg desc and value (or value's room)
              */
             numberOfEventTypes -= BINLOG_CHECKSUM_ALG_DESC_LEN;
         }
 
-        if (logger.isInfoEnabled()) logger.info("common_header_len= " + commonHeaderLen + ", number_of_event_types= "
-                                                + numberOfEventTypes);
+        if (logger.isInfoEnabled()) {
+            logger.info("common_header_len= " + commonHeaderLen + ", number_of_event_types= " + numberOfEventTypes);
+        }
     }
 
     /** MySQL 5.0 format descriptions. */
@@ -212,6 +213,7 @@ public final class FormatDescriptionLogEvent extends StartLogEventV3 {
                 postHeaderLen[VIEW_CHANGE_EVENT - 1] = VIEW_CHANGE_HEADER_LEN;
                 postHeaderLen[XA_PREPARE_LOG_EVENT - 1] = XA_PREPARE_HEADER_LEN;
                 postHeaderLen[PARTIAL_UPDATE_ROWS_EVENT - 1] = ROWS_HEADER_LEN_V2;
+                postHeaderLen[TRANSACTION_PAYLOAD_EVENT - 1] = TRANSACTION_PAYLOAD_HEADER_LEN;
 
                 // mariadb 10
                 postHeaderLen[ANNOTATE_ROWS_EVENT - 1] = ANNOTATE_ROWS_HEADER_LEN;

+ 6 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/LogHeader.java

@@ -293,6 +293,12 @@ public final class LogHeader {
     public void setLogFileName(String logFileName) {
         this.logFileName = logFileName;
     }
+    public void setLogPos(long logPos) {
+        this.logPos = logPos;
+    }
+    public void setEventLen(int eventLen) {
+        this.eventLen = eventLen;
+    }
 
     private void processCheckSum(LogBuffer buffer) {
         if (checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_OFF && checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_UNDEF) {

+ 80 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/TransactionPayloadLogEvent.java

@@ -5,12 +5,91 @@ import com.taobao.tddl.dbsync.binlog.LogEvent;
 
 /**
  * @author agapple 2022年5月23日 下午7:05:39
- * @version 1.1.6
+ * @version 1.1.7
  * @since mysql 8.0.20
  */
 public class TransactionPayloadLogEvent extends LogEvent {
 
+    public static final short COMPRESSION_TYPE_MIN_LENGTH         = 1;
+    public static final short COMPRESSION_TYPE_MAX_LENGTH         = 9;
+    public static final short PAYLOAD_SIZE_MIN_LENGTH             = 0;
+    public static final short PAYLOAD_SIZE_MAX_LENGTH             = 9;
+    public static final short UNCOMPRESSED_SIZE_MIN_LENGTH        = 0;
+    public static final short UNCOMPRESSED_SIZE_MAX_LENGTH        = 9;
+    public static final int   MAX_DATA_LENGTH                     = COMPRESSION_TYPE_MAX_LENGTH
+                                                                    + PAYLOAD_SIZE_MAX_LENGTH
+                                                                    + UNCOMPRESSED_SIZE_MAX_LENGTH;
+
+    /** Marks the end of the payload header. */
+    public static final int   OTW_PAYLOAD_HEADER_END_MARK         = 0;
+
+    /** The payload field */
+    public static final int   OTW_PAYLOAD_SIZE_FIELD              = 1;
+
+    /** The compression type field */
+    public static final int   OTW_PAYLOAD_COMPRESSION_TYPE_FIELD  = 2;
+
+    /** The uncompressed size field */
+    public static final int   OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD = 3;
+
+    /* ZSTD compression. */
+    public final static int   COMPRESS_TYPE_ZSTD                  = 0;
+    /* No compression. */
+    public final static int   COMPRESS_TYPE_NONE                  = 255;
+
+    private long              m_compression_type                  = COMPRESS_TYPE_NONE;
+    private long              m_payload_size;
+    private long              m_uncompressed_size;
+    private byte[]            m_payload;
+
     public TransactionPayloadLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
         super(header);
+
+        final int commonHeaderLen = descriptionEvent.getCommonHeaderLen();
+        final int postHeaderLen = descriptionEvent.getPostHeaderLen()[header.getType() - 1];
+
+        int offset = commonHeaderLen;
+        buffer.position(offset);
+        long type = 0, length = 0;
+        while (buffer.hasRemaining()) {
+            type = buffer.getPackedLong(); // type
+            if (type == OTW_PAYLOAD_HEADER_END_MARK) {
+                break;
+            }
+
+            length = buffer.getPackedLong(); // length
+            switch ((int) type) {
+                case OTW_PAYLOAD_SIZE_FIELD:
+                    m_payload_size = buffer.getPackedLong(); // value
+                    break;
+                case OTW_PAYLOAD_COMPRESSION_TYPE_FIELD:
+                    m_compression_type = buffer.getPackedLong(); // value
+                    break;
+                case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD:
+                    m_uncompressed_size = buffer.getPackedLong(); // value
+                    break;
+                default:
+                    buffer.forward((int) length);
+                    break;
+            }
+
+        }
+
+        if (m_uncompressed_size == 0) {
+            m_uncompressed_size = m_payload_size;
+        }
+        m_payload = buffer.getData((int) m_payload_size);
+    }
+
+    public boolean isCompressByZstd() {
+        return m_compression_type == COMPRESS_TYPE_ZSTD;
+    }
+
+    public boolean isCompressByNone() {
+        return m_compression_type == COMPRESS_TYPE_NONE;
+    }
+
+    public byte[] getPayload() {
+        return m_payload;
     }
 }

+ 48 - 44
dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java

@@ -4,20 +4,14 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.Statement;
+import java.util.List;
 
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
-import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.*;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
-@Ignore
+
 public class DirectLogFetcherTest extends BaseLogFetcherTest {
 
     @Test
@@ -30,47 +24,15 @@ public class DirectLogFetcherTest extends BaseLogFetcherTest {
             statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'");
             statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");
 
-            fecther.open(connection, "mysql-bin.000007", 89797036L, 2);
+            fecther.open(connection, "binlog.000002", 4L, 1);
 
             LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();
             while (fecther.fetch()) {
                 LogEvent event = decoder.decode(fecther, context);
-                int eventType = event.getHeader().getType();
-                switch (eventType) {
-                    case LogEvent.ROTATE_EVENT:
-                        binlogFileName = ((RotateLogEvent) event).getFilename();
-                        break;
-                    case LogEvent.WRITE_ROWS_EVENT_V1:
-                    case LogEvent.WRITE_ROWS_EVENT:
-                        parseRowsEvent((WriteRowsLogEvent) event);
-                        break;
-                    case LogEvent.UPDATE_ROWS_EVENT_V1:
-                    case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
-                    case LogEvent.UPDATE_ROWS_EVENT:
-                        parseRowsEvent((UpdateRowsLogEvent) event);
-                        break;
-                    case LogEvent.DELETE_ROWS_EVENT_V1:
-                    case LogEvent.DELETE_ROWS_EVENT:
-                        parseRowsEvent((DeleteRowsLogEvent) event);
-                        break;
-                    case LogEvent.QUERY_EVENT:
-                        parseQueryEvent((QueryLogEvent) event);
-                        break;
-                    case LogEvent.ROWS_QUERY_LOG_EVENT:
-                        parseRowsQueryEvent((RowsQueryLogEvent) event);
-                        break;
-                    case LogEvent.ANNOTATE_ROWS_EVENT:
-                        parseAnnotateRowsEvent((AnnotateRowsEvent) event);
-                        break;
-                    case LogEvent.XID_EVENT:
-                        parseXidEvent((XidLogEvent) event);
-                        break;
-                    default:
-                        break;
-                }
+                processEvent(event, decoder, context);
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
         } finally {
@@ -82,4 +44,46 @@ public class DirectLogFetcherTest extends BaseLogFetcherTest {
         }
 
     }
+
+    public void processEvent(LogEvent event, LogDecoder decoder, LogContext context) throws Throwable {
+        int eventType = event.getHeader().getType();
+        switch (eventType) {
+            case LogEvent.ROTATE_EVENT:
+                binlogFileName = ((RotateLogEvent) event).getFilename();
+                break;
+            case LogEvent.WRITE_ROWS_EVENT_V1:
+            case LogEvent.WRITE_ROWS_EVENT:
+                parseRowsEvent((WriteRowsLogEvent) event);
+                break;
+            case LogEvent.UPDATE_ROWS_EVENT_V1:
+            case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
+            case LogEvent.UPDATE_ROWS_EVENT:
+                parseRowsEvent((UpdateRowsLogEvent) event);
+                break;
+            case LogEvent.DELETE_ROWS_EVENT_V1:
+            case LogEvent.DELETE_ROWS_EVENT:
+                parseRowsEvent((DeleteRowsLogEvent) event);
+                break;
+            case LogEvent.QUERY_EVENT:
+                parseQueryEvent((QueryLogEvent) event);
+                break;
+            case LogEvent.ROWS_QUERY_LOG_EVENT:
+                parseRowsQueryEvent((RowsQueryLogEvent) event);
+                break;
+            case LogEvent.ANNOTATE_ROWS_EVENT:
+                parseAnnotateRowsEvent((AnnotateRowsEvent) event);
+                break;
+            case LogEvent.XID_EVENT:
+                parseXidEvent((XidLogEvent) event);
+                break;
+            case LogEvent.TRANSACTION_PAYLOAD_EVENT:
+                List<LogEvent> events = decoder.processIterateDecode(event, context);
+                for (LogEvent deEvent : events) {
+                    processEvent(deEvent, decoder, context);
+                }
+                break;
+            default:
+                break;
+        }
+    }
 }

+ 6 - 1
pom.xml

@@ -184,7 +184,12 @@
             <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-compress</artifactId>
-                <version>1.9</version>
+                <version>1.22</version>
+            </dependency>
+            <dependency>
+                <groupId>com.github.luben</groupId>
+                <artifactId>zstd-jni</artifactId>
+                <version>1.5.2-5</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.httpcomponents</groupId>