瀏覽代碼

fixes issue #4388 , support mariadb 10.x log_bin_compress

jianghang.loujh 1 年之前
父節點
當前提交
f13cc4ee9f
共有 23 個文件被更改,包括 673 次插入343 次删除
  1. 230 102
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogBuffer.java
  2. 29 21
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java
  3. 7 2
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/DeleteRowsLogEvent.java
  4. 9 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/FormatDescriptionLogEvent.java
  5. 4 4
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/LogHeader.java
  6. 13 4
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/QueryLogEvent.java
  7. 3 2
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RotateLogEvent.java
  8. 22 4
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java
  9. 8 3
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/UpdateRowsLogEvent.java
  10. 7 2
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/WriteRowsLogEvent.java
  11. 12 1
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/BinlogCheckPointLogEvent.java
  12. 1 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/DeleteRowsCompressLogEvent.java
  13. 1 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/QueryCompressedLogEvent.java
  14. 1 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/UpdateRowsCompressLogEvent.java
  15. 1 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/WriteRowsCompressLogEvent.java
  16. 6 2
      dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java
  17. 45 36
      dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/FileLogFetcherTest.java
  18. 14 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java
  19. 12 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  20. 149 65
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java
  21. 55 50
      parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java
  22. 2 2
      parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogDumpPerformanceTest.java
  23. 42 36
      parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogParsePerformanceTest.java

+ 230 - 102
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogBuffer.java

@@ -1,5 +1,6 @@
 package com.taobao.tddl.dbsync.binlog;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.math.BigDecimal;
@@ -9,6 +10,9 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.BitSet;
 
+import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
+import org.apache.commons.io.IOUtils;
+
 /**
  * TODO: Document Me!!
  * 
@@ -216,7 +220,9 @@ public class LogBuffer {
      * Return 8-bit signed int from buffer.
      */
     public final int getInt8(final int pos) {
-        if (pos >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: " + pos);
+        if (pos >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + pos);
+        }
 
         return buffer[origin + pos];
     }
@@ -225,7 +231,9 @@ public class LogBuffer {
      * Return next 8-bit signed int from buffer.
      */
     public final int getInt8() {
-        if (position >= origin + limit) throw new IllegalArgumentException("limit excceed: " + (position - origin));
+        if (position >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin));
+        }
 
         return buffer[position++];
     }
@@ -234,7 +242,9 @@ public class LogBuffer {
      * Return 8-bit unsigned int from buffer.
      */
     public final int getUint8(final int pos) {
-        if (pos >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: " + pos);
+        if (pos >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + pos);
+        }
 
         return 0xff & buffer[origin + pos];
     }
@@ -243,7 +253,9 @@ public class LogBuffer {
      * Return next 8-bit unsigned int from buffer.
      */
     public final int getUint8() {
-        if (position >= origin + limit) throw new IllegalArgumentException("limit excceed: " + (position - origin));
+        if (position >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin));
+        }
 
         return 0xff & buffer[position++];
     }
@@ -256,8 +268,9 @@ public class LogBuffer {
     public final int getInt16(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 1 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 1)));
+        if (pos + 1 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 1)));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position]) | ((buf[position + 1]) << 8);
@@ -269,8 +282,9 @@ public class LogBuffer {
      * @see mysql-5.1.60/include/my_global.h - sint2korr
      */
     public final int getInt16() {
-        if (position + 1 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 1));
+        if (position + 1 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 1));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position++]) | ((buf[position++]) << 8);
@@ -284,8 +298,9 @@ public class LogBuffer {
     public final int getUint16(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 1 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 1)));
+        if (pos + 1 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 1)));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position]) | ((0xff & buf[position + 1]) << 8);
@@ -297,8 +312,9 @@ public class LogBuffer {
      * @see mysql-5.1.60/include/my_global.h - uint2korr
      */
     public final int getUint16() {
-        if (position + 1 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 1));
+        if (position + 1 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 1));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position++]) | ((0xff & buf[position++]) << 8);
@@ -311,9 +327,9 @@ public class LogBuffer {
      */
     public final int getBeInt16(final int pos) {
         final int position = origin + pos;
-
-        if (pos + 1 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 1)));
+        if (pos + 1 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 1)));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position + 1]) | ((buf[position]) << 8);
@@ -325,8 +341,9 @@ public class LogBuffer {
      * @see mysql-5.1.60/include/my_global.h - mi_sint2korr
      */
     public final int getBeInt16() {
-        if (position + 1 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 1));
+        if (position + 1 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 1));
+        }
 
         byte[] buf = buffer;
         return (buf[position++] << 8) | (0xff & buf[position++]);
@@ -339,9 +356,9 @@ public class LogBuffer {
      */
     public final int getBeUint16(final int pos) {
         final int position = origin + pos;
-
-        if (pos + 1 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 1)));
+        if (pos + 1 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 1)));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position + 1]) | ((0xff & buf[position]) << 8);
@@ -353,8 +370,9 @@ public class LogBuffer {
      * @see mysql-5.6.10/include/myisampack.h - mi_usint2korr
      */
     public final int getBeUint16() {
-        if (position + 1 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 1));
+        if (position + 1 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 1));
+        }
 
         byte[] buf = buffer;
         return ((0xff & buf[position++]) << 8) | (0xff & buf[position++]);
@@ -368,8 +386,9 @@ public class LogBuffer {
     public final int getInt24(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 2 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 2)));
+        if (pos + 2 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 2)));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position]) | ((0xff & buf[position + 1]) << 8) | ((buf[position + 2]) << 16);
@@ -381,8 +400,9 @@ public class LogBuffer {
      * @see mysql-5.1.60/include/my_global.h - sint3korr
      */
     public final int getInt24() {
-        if (position + 2 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 2));
+        if (position + 2 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 2));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position++]) | ((0xff & buf[position++]) << 8) | ((buf[position++]) << 16);
@@ -396,8 +416,9 @@ public class LogBuffer {
     public final int getBeInt24(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 2 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 2)));
+        if (pos + 2 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 2)));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position + 2]) | ((0xff & buf[position + 1]) << 8) | ((buf[position]) << 16);
@@ -409,8 +430,9 @@ public class LogBuffer {
      * @see mysql-5.6.10/include/myisampack.h - mi_usint3korr
      */
     public final int getBeInt24() {
-        if (position + 2 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 2));
+        if (position + 2 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 2));
+        }
 
         byte[] buf = buffer;
         return ((buf[position++]) << 16) | ((0xff & buf[position++]) << 8) | (0xff & buf[position++]);
@@ -424,8 +446,9 @@ public class LogBuffer {
     public final int getUint24(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 2 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 2)));
+        if (pos + 2 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 2)));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position]) | ((0xff & buf[position + 1]) << 8) | ((0xff & buf[position + 2]) << 16);
@@ -437,8 +460,9 @@ public class LogBuffer {
      * @see mysql-5.1.60/include/my_global.h - uint3korr
      */
     public final int getUint24() {
-        if (position + 2 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 2));
+        if (position + 2 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 2));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position++]) | ((0xff & buf[position++]) << 8) | ((0xff & buf[position++]) << 16);
@@ -452,8 +476,9 @@ public class LogBuffer {
     public final int getBeUint24(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 2 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 2)));
+        if (pos + 2 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 2)));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position + 2]) | ((0xff & buf[position + 1]) << 8) | ((0xff & buf[position]) << 16);
@@ -465,8 +490,9 @@ public class LogBuffer {
      * @see mysql-5.6.10/include/myisampack.h - mi_usint3korr
      */
     public final int getBeUint24() {
-        if (position + 2 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 2));
+        if (position + 2 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 2));
+        }
 
         byte[] buf = buffer;
         return ((0xff & buf[position++]) << 16) | ((0xff & buf[position++]) << 8) | (0xff & buf[position++]);
@@ -480,8 +506,9 @@ public class LogBuffer {
     public final int getInt32(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 3 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 3)));
+        if (pos + 3 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 3)));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position]) | ((0xff & buf[position + 1]) << 8) | ((0xff & buf[position + 2]) << 16)
@@ -496,8 +523,9 @@ public class LogBuffer {
     public final int getBeInt32(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 3 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 3)));
+        if (pos + 3 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 3)));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position + 3]) | ((0xff & buf[position + 2]) << 8) | ((0xff & buf[position + 1]) << 16)
@@ -510,8 +538,9 @@ public class LogBuffer {
      * @see mysql-5.1.60/include/my_global.h - sint4korr
      */
     public final int getInt32() {
-        if (position + 3 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 3));
+        if (position + 3 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 3));
+        }
 
         byte[] buf = buffer;
         return (0xff & buf[position++]) | ((0xff & buf[position++]) << 8) | ((0xff & buf[position++]) << 16)
@@ -524,8 +553,9 @@ public class LogBuffer {
      * @see mysql-5.6.10/include/myisampack.h - mi_sint4korr
      */
     public final int getBeInt32() {
-        if (position + 3 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 3));
+        if (position + 3 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 3));
+        }
 
         byte[] buf = buffer;
         return ((buf[position++]) << 24) | ((0xff & buf[position++]) << 16) | ((0xff & buf[position++]) << 8)
@@ -540,8 +570,9 @@ public class LogBuffer {
     public final long getUint32(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 3 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 3)));
+        if (pos + 3 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 3)));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position])) | ((long) (0xff & buf[position + 1]) << 8)
@@ -556,8 +587,9 @@ public class LogBuffer {
     public final long getBeUint32(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 3 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 3)));
+        if (pos + 3 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 3)));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position + 3])) | ((long) (0xff & buf[position + 2]) << 8)
@@ -570,8 +602,9 @@ public class LogBuffer {
      * @see mysql-5.1.60/include/my_global.h - uint4korr
      */
     public final long getUint32() {
-        if (position + 3 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 3));
+        if (position + 3 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 3));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position++])) | ((long) (0xff & buf[position++]) << 8)
@@ -584,8 +617,9 @@ public class LogBuffer {
      * @see mysql-5.6.10/include/myisampack.h - mi_uint4korr
      */
     public final long getBeUint32() {
-        if (position + 3 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 3));
+        if (position + 3 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 3));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position++]) << 24) | ((long) (0xff & buf[position++]) << 16)
@@ -598,8 +632,9 @@ public class LogBuffer {
     public final long getUlong40(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 4 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 4)));
+        if (pos + 4 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 4)));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position])) | ((long) (0xff & buf[position + 1]) << 8)
@@ -611,8 +646,9 @@ public class LogBuffer {
      * Return next 40-bit unsigned int from buffer. (little-endian)
      */
     public final long getUlong40() {
-        if (position + 4 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 4));
+        if (position + 4 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 4));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position++])) | ((long) (0xff & buf[position++]) << 8)
@@ -628,8 +664,9 @@ public class LogBuffer {
     public final long getBeUlong40(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 4 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 4)));
+        if (pos + 4 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 4)));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position + 4])) | ((long) (0xff & buf[position + 3]) << 8)
@@ -643,8 +680,9 @@ public class LogBuffer {
      * @see mysql-5.6.10/include/myisampack.h - mi_uint5korr
      */
     public final long getBeUlong40() {
-        if (position + 4 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 4));
+        if (position + 4 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 4));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position++]) << 32) | ((long) (0xff & buf[position++]) << 24)
@@ -660,8 +698,9 @@ public class LogBuffer {
     public final long getLong48(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 5 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 5)));
+        if (pos + 5 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 5)));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position])) | ((long) (0xff & buf[position + 1]) << 8)
@@ -677,8 +716,9 @@ public class LogBuffer {
     public final long getBeLong48(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 5 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 5)));
+        if (pos + 5 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 5)));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position + 5])) | ((long) (0xff & buf[position + 4]) << 8)
@@ -692,8 +732,9 @@ public class LogBuffer {
      * @see mysql-5.1.60/include/my_global.h - sint6korr
      */
     public final long getLong48() {
-        if (position + 5 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 5));
+        if (position + 5 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 5));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position++])) | ((long) (0xff & buf[position++]) << 8)
@@ -707,8 +748,9 @@ public class LogBuffer {
      * @see mysql-5.6.10/include/myisampack.h - mi_sint6korr
      */
     public final long getBeLong48() {
-        if (position + 5 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 5));
+        if (position + 5 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 5));
+        }
 
         byte[] buf = buffer;
         return ((long) (buf[position++]) << 40) | ((long) (0xff & buf[position++]) << 32)
@@ -724,8 +766,9 @@ public class LogBuffer {
     public final long getUlong48(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 5 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 5)));
+        if (pos + 5 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 5)));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position])) | ((long) (0xff & buf[position + 1]) << 8)
@@ -741,8 +784,9 @@ public class LogBuffer {
     public final long getBeUlong48(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 5 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 5)));
+        if (pos + 5 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 5)));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position + 5])) | ((long) (0xff & buf[position + 4]) << 8)
@@ -756,8 +800,9 @@ public class LogBuffer {
      * @see mysql-5.1.60/include/my_global.h - uint6korr
      */
     public final long getUlong48() {
-        if (position + 5 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 5));
+        if (position + 5 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 5));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position++])) | ((long) (0xff & buf[position++]) << 8)
@@ -771,8 +816,9 @@ public class LogBuffer {
      * @see mysql-5.6.10/include/myisampack.h - mi_uint6korr
      */
     public final long getBeUlong48() {
-        if (position + 5 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 5));
+        if (position + 5 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 5));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position++]) << 40) | ((long) (0xff & buf[position++]) << 32)
@@ -786,8 +832,9 @@ public class LogBuffer {
     public final long getUlong56(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 6 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 6)));
+        if (pos + 6 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 6)));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position])) | ((long) (0xff & buf[position + 1]) << 8)
@@ -800,8 +847,9 @@ public class LogBuffer {
      * Return next 56-bit unsigned int from buffer. (little-endian)
      */
     public final long getUlong56() {
-        if (position + 6 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 6));
+        if (position + 6 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 6));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position++])) | ((long) (0xff & buf[position++]) << 8)
@@ -816,8 +864,9 @@ public class LogBuffer {
     public final long getBeUlong56(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 6 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 6)));
+        if (pos + 6 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 6)));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position + 6])) | ((long) (0xff & buf[position + 5]) << 8)
@@ -830,8 +879,9 @@ public class LogBuffer {
      * Return next 56-bit unsigned int from buffer. (big-endian)
      */
     public final long getBeUlong56() {
-        if (position + 6 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 6));
+        if (position + 6 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 6));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position++]) << 48) | ((long) (0xff & buf[position++]) << 40)
@@ -848,8 +898,9 @@ public class LogBuffer {
     public final long getLong64(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 7 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 7)));
+        if (pos + 7 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 7)));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position])) | ((long) (0xff & buf[position + 1]) << 8)
@@ -866,8 +917,9 @@ public class LogBuffer {
     public final long getBeLong64(final int pos) {
         final int position = origin + pos;
 
-        if (pos + 7 >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                            + (pos < 0 ? pos : (pos + 7)));
+        if (pos + 7 >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + 7)));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position + 7])) | ((long) (0xff & buf[position + 6]) << 8)
@@ -882,8 +934,9 @@ public class LogBuffer {
      * @see mysql-5.1.60/include/my_global.h - sint8korr
      */
     public final long getLong64() {
-        if (position + 7 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 7));
+        if (position + 7 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 7));
+        }
 
         byte[] buf = buffer;
         return ((long) (0xff & buf[position++])) | ((long) (0xff & buf[position++]) << 8)
@@ -898,8 +951,9 @@ public class LogBuffer {
      * @see mysql-5.6.10/include/myisampack.h - mi_sint8korr
      */
     public final long getBeLong64() {
-        if (position + 7 >= origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                               + (position - origin + 7));
+        if (position + 7 >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position - origin + 7));
+        }
 
         byte[] buf = buffer;
         return ((long) (buf[position++]) << 56) | ((long) (0xff & buf[position++]) << 48)
@@ -1506,8 +1560,9 @@ public class LogBuffer {
      * @param len The length of MY_BITMAP in bits.
      */
     public final void fillBitmap(BitSet bitmap, final int pos, final int len) {
-        if (pos + ((len + 7) / 8) > limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                                         + (pos + (len + 7) / 8));
+        if (pos + ((len + 7) / 8) > limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos + (len + 7) / 8));
+        }
 
         fillBitmap0(bitmap, origin + pos, len);
     }
@@ -1518,9 +1573,9 @@ public class LogBuffer {
      * @param len The length of MY_BITMAP in bits.
      */
     public final void fillBitmap(BitSet bitmap, final int len) {
-        if (position + ((len + 7) / 8) > origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                                            + (position
-                                                                                               + ((len + 7) / 8) - origin));
+        if (position + ((len + 7) / 8) > origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position + ((len + 7) / 8) - origin));
+        }
 
         position = fillBitmap0(bitmap, position, len);
     }
@@ -1596,7 +1651,9 @@ public class LogBuffer {
      * Fill n bytes in this buffer.
      */
     public final void fillBytes(final int pos, byte[] dest, final int destPos, final int len) {
-        if (pos + len > limit || pos < 0) throw new IllegalArgumentException("limit excceed: " + (pos + len));
+        if (pos + len > limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos + len));
+        }
 
         System.arraycopy(buffer, origin + pos, dest, destPos, len);
     }
@@ -1605,8 +1662,9 @@ public class LogBuffer {
      * Fill next n bytes in this buffer.
      */
     public final void fillBytes(byte[] dest, final int destPos, final int len) {
-        if (position + len > origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                                + (position + len - origin));
+        if (position + len > origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position + len - origin));
+        }
 
         System.arraycopy(buffer, position, dest, destPos, len);
         position += len;
@@ -1637,6 +1695,76 @@ public class LogBuffer {
         return getData(0, limit);
     }
 
+    /**
+     * mariadb compress log event Get the length of uncompress content.
+     * 
+     * @return 0 means error.
+     */
+    public final long getUncompressLong(int lenPad) {
+        long len = 0;
+        switch (lenPad) {
+            case 1:
+                len = getInt8();
+                break;
+            case 2:
+                len = getBeUint16();
+                break;
+            case 3:
+                len = getBeUint24();
+                break;
+            case 4:
+                len = getBeUint32();
+                break;
+            default:
+                len = 0;
+                break;
+        }
+
+        return len;
+    }
+
+    /**
+     * uncompress mariadb log event
+     * 
+     * @return
+     */
+    public LogBuffer uncompressBuf() {
+        int lenPad = getInt8();
+        long len = getUncompressLong(lenPad & 0x07);
+        int alg = (lenPad & 0x70) >> 4;
+        LogBuffer buffer = null;
+        try {
+            switch (alg) {
+                case 0:
+                    buffer = uncompressZlib(limit - position);
+                    break;
+                default:
+                    // bad algorithm
+                    return this;
+            }
+        } catch (Exception e) {
+            throw new IllegalArgumentException("uncompress failed ", e);
+        }
+
+        if (buffer.limit() != len) {
+            throw new IllegalArgumentException(
+                "uncompress lenght not match, expected : " + len + " , but actual : " + buffer.limit());
+        }
+        return buffer;
+    }
+
+    private LogBuffer uncompressZlib(int len) throws Exception {
+        if (position + len > limit || position < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (position + len));
+        }
+
+        try (DeflateCompressorInputStream in = new DeflateCompressorInputStream(
+            new ByteArrayInputStream(buffer, position, position + len))) {
+            byte[] decodeBytes = IOUtils.toByteArray(in);
+            return new LogBuffer(decodeBytes, 0, decodeBytes.length);
+        }
+    }
+
     /**
      * Return full hexdump from position.
      */

+ 29 - 21
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -126,14 +126,15 @@ public final class LogDecoder {
             TransactionPayloadLogEvent compressEvent = ((TransactionPayloadLogEvent) event);
             LogBuffer iterateBuffer = null;
             if (compressEvent.isCompressByZstd()) {
-                ZstdCompressorInputStream in = new ZstdCompressorInputStream(
-                    new ByteArrayInputStream(compressEvent.getPayload()));
-                byte[] decodeBytes = IOUtils.toByteArray(in);
-                iterateBuffer = new LogBuffer(decodeBytes, 0, decodeBytes.length);
+                try (ZstdCompressorInputStream in = new ZstdCompressorInputStream(
+                    new ByteArrayInputStream(compressEvent.getPayload()))) {
+                    byte[] decodeBytes = IOUtils.toByteArray(in);
+                    iterateBuffer = new LogBuffer(decodeBytes, 0, decodeBytes.length);
+                }
             } else if (compressEvent.isCompressByNone()) {
                 iterateBuffer = new LogBuffer(compressEvent.getPayload(), 0, compressEvent.getPayload().length);
             } else {
-                throw new IllegalArgumentException("unkonow compresstype for " + event.getHeader().getLogFileName()
+                throw new IllegalArgumentException("unknow compress type for " + event.getHeader().getLogFileName()
                                                    + ":" + event.getHeader().getLogPos());
             }
 
@@ -446,6 +447,7 @@ public final class LogDecoder {
                 BinlogCheckPointLogEvent event = new BinlogCheckPointLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
+                logPosition.fileName = event.getFilename();
                 return event;
             }
             case LogEvent.GTID_EVENT: {
@@ -487,31 +489,37 @@ public final class LogDecoder {
                 return event;
             }
             case LogEvent.QUERY_COMPRESSED_EVENT: {
-                if (logger.isWarnEnabled()) {
-                    logger.warn("Skipping unsupported MaraiDB QUERY_COMPRESSED_EVENT from: " + context.getLogPosition());
-                }
-                break;
+                QueryCompressedLogEvent event = new QueryCompressedLogEvent(header, buffer, descriptionEvent);
+                /* updating position in context */
+                logPosition.position = header.getLogPos();
+                return event;
             }
             case LogEvent.WRITE_ROWS_COMPRESSED_EVENT_V1:
             case LogEvent.WRITE_ROWS_COMPRESSED_EVENT: {
-                if (logger.isWarnEnabled()) {
-                    logger.warn("Skipping unsupported MaraiDB WRITE_ROWS_COMPRESSED_EVENT from: " + context.getLogPosition());
-                }
-                break;
+                WriteRowsCompressLogEvent event = new WriteRowsCompressLogEvent(header, buffer, descriptionEvent);
+                /* updating position in context */
+                logPosition.position = header.getLogPos();
+                event.fillTable(context);
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
+                return event;
             }
             case LogEvent.UPDATE_ROWS_COMPRESSED_EVENT_V1:
             case LogEvent.UPDATE_ROWS_COMPRESSED_EVENT: {
-                if (logger.isWarnEnabled()) {
-                    logger.warn("Skipping unsupported MaraiDB UPDATE_ROWS_COMPRESSED_EVENT from: " + context.getLogPosition());
-                }
-                break;
+                UpdateRowsCompressLogEvent event = new UpdateRowsCompressLogEvent(header, buffer, descriptionEvent);
+                /* updating position in context */
+                logPosition.position = header.getLogPos();
+                event.fillTable(context);
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
+                return event;
             }
             case LogEvent.DELETE_ROWS_COMPRESSED_EVENT_V1:
             case LogEvent.DELETE_ROWS_COMPRESSED_EVENT: {
-                if (logger.isWarnEnabled()) {
-                    logger.warn("Skipping unsupported MaraiDB DELETE_ROWS_COMPRESSED_EVENT from: " + context.getLogPosition());
-                }
-                break;
+                DeleteRowsCompressLogEvent event = new DeleteRowsCompressLogEvent(header, buffer, descriptionEvent);
+                /* updating position in context */
+                logPosition.position = header.getLogPos();
+                event.fillTable(context);
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
+                return event;
             }
             default:
                 /*

+ 7 - 2
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/DeleteRowsLogEvent.java

@@ -9,9 +9,14 @@ import com.taobao.tddl.dbsync.binlog.LogBuffer;
  * @author <a href="mailto:changyuan.lh@taobao.com">Changyuan.lh</a>
  * @version 1.0
  */
-public final class DeleteRowsLogEvent extends RowsLogEvent {
+public class DeleteRowsLogEvent extends RowsLogEvent {
 
     public DeleteRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
-        super(header, buffer, descriptionEvent);
+        super(header, buffer, descriptionEvent, false, false);
+    }
+
+    public DeleteRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
+                              boolean compress){
+        super(header, buffer, descriptionEvent, false, compress);
     }
 }

+ 9 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/FormatDescriptionLogEvent.java

@@ -221,6 +221,15 @@ public final class FormatDescriptionLogEvent extends StartLogEventV3 {
                 postHeaderLen[GTID_EVENT - 1] = GTID_HEADER_LEN;
                 postHeaderLen[GTID_LIST_EVENT - 1] = GTID_LIST_HEADER_LEN;
                 postHeaderLen[START_ENCRYPTION_EVENT - 1] = START_ENCRYPTION_HEADER_LEN;
+
+                // mariadb compress
+                postHeaderLen[QUERY_COMPRESSED_EVENT - 1] = QUERY_COMPRESSED_EVENT;
+                postHeaderLen[WRITE_ROWS_COMPRESSED_EVENT - 1] = ROWS_HEADER_LEN_V2;
+                postHeaderLen[UPDATE_ROWS_COMPRESSED_EVENT - 1] = ROWS_HEADER_LEN_V2;
+                postHeaderLen[DELETE_ROWS_COMPRESSED_EVENT - 1] = ROWS_HEADER_LEN_V2;
+                postHeaderLen[WRITE_ROWS_COMPRESSED_EVENT_V1 - 1] = ROWS_HEADER_LEN_V1;
+                postHeaderLen[UPDATE_ROWS_COMPRESSED_EVENT_V1 - 1] = ROWS_HEADER_LEN_V1;
+                postHeaderLen[DELETE_ROWS_COMPRESSED_EVENT_V1 - 1] = ROWS_HEADER_LEN_V1;
                 break;
 
             case 3: /* 4.0.x x>=2 */

+ 4 - 4
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/LogHeader.java

@@ -1,12 +1,12 @@
 package com.taobao.tddl.dbsync.binlog.event;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * The Common-Header, documented in the table @ref Table_common_header "below",
  * always has the same form and length within one version of MySQL. Each event
@@ -66,7 +66,7 @@ import java.util.Map;
  */
 public final class LogHeader {
 
-    protected final int type;
+    protected int                 type;
 
     /**
      * The offset in the log where this event originally appeared (it is

+ 13 - 4
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/QueryLogEvent.java

@@ -438,9 +438,13 @@ public class QueryLogEvent extends LogEvent {
     private String          timezone;
 
     public QueryLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent)
-                                                                                                        throws IOException{
-        super(header);
+                                                                                                         throws IOException{
+        this(header, buffer, descriptionEvent, false);
+    }
 
+    public QueryLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
+                         boolean compress) throws IOException{
+        super(header);
         final int commonHeaderLen = descriptionEvent.commonHeaderLen;
         final int postHeaderLen = descriptionEvent.postHeaderLen[header.type - 1];
         /*
@@ -495,10 +499,15 @@ public class QueryLogEvent extends LogEvent {
         unpackVariables(buffer, end);
         buffer.position(end);
         buffer.limit(limit);
-
         /* A 2nd variable part; this is common to all versions */
-        final int queryLen = dataLen - dbLen - 1;
         dbname = buffer.getFixName(dbLen + 1);
+        int queryLen = dataLen - dbLen - 1;
+        if (compress) {
+            // mariadb compress log event
+            // see https://github.com/alibaba/canal/issues/4388
+            buffer = buffer.uncompressBuf();
+            queryLen = buffer.limit();
+        }
         if (clientCharset >= 0) {
             charset = CharsetConversion.getNioCharset(clientCharset);
 

+ 3 - 2
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RotateLogEvent.java

@@ -88,9 +88,10 @@ public final class RotateLogEvent extends LogEvent {
 
         final int filenameOffset = headerSize + postHeaderLen;
         int filenameLen = buffer.limit() - filenameOffset;
-        if (filenameLen > FN_REFLEN - 1) filenameLen = FN_REFLEN - 1;
+        if (filenameLen > FN_REFLEN - 1) {
+            filenameLen = FN_REFLEN - 1;
+        }
         buffer.position(filenameOffset);
-
         filename = buffer.getFixString(filenameLen);
     }
 

+ 22 - 4
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java

@@ -3,11 +3,11 @@ package com.taobao.tddl.dbsync.binlog.event;
 import java.nio.charset.Charset;
 import java.util.BitSet;
 
-import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent.ColumnInfo;
+import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;
 
 /**
  * Common base class for all row-containing log events.
@@ -118,9 +118,14 @@ public abstract class RowsLogEvent extends LogEvent {
         this(header, buffer, descriptionEvent, false);
     }
 
-    public RowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent, boolean partial){
-        super(header);
+    public RowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
+                        boolean partial){
+        this(header, buffer, descriptionEvent, false, false);
+    }
 
+    public RowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent, boolean partial,
+                        boolean compress){
+        super(header);
         final int commonHeaderLen = descriptionEvent.commonHeaderLen;
         final int postHeaderLen = descriptionEvent.postHeaderLen[header.type - 1];
         int headerLen = 0;
@@ -167,7 +172,8 @@ public abstract class RowsLogEvent extends LogEvent {
         columns = buffer.getBitmap(columnLen);
 
         if (header.type == UPDATE_ROWS_EVENT_V1 || header.type == UPDATE_ROWS_EVENT
-            || header.type == PARTIAL_UPDATE_ROWS_EVENT) {
+            || header.type == PARTIAL_UPDATE_ROWS_EVENT || header.type == UPDATE_ROWS_COMPRESSED_EVENT
+            || header.type == UPDATE_ROWS_COMPRESSED_EVENT_V1) {
             changeColumns = buffer.getBitmap(columnLen);
         } else {
             changeColumns = columns;
@@ -175,6 +181,18 @@ public abstract class RowsLogEvent extends LogEvent {
 
         // XXX: Don't handle buffer in another thread.
         int dataSize = buffer.limit() - buffer.position();
+        if (compress) {
+            // mariadb compress log event
+            // see https://github.com/alibaba/canal/issues/4388
+            buffer = buffer.uncompressBuf();
+            dataSize = buffer.limit();
+            // rewrite type
+            if (postHeaderLen == FormatDescriptionLogEvent.ROWS_HEADER_LEN_V2) {
+                header.type = header.type - WRITE_ROWS_COMPRESSED_EVENT + WRITE_ROWS_EVENT;
+            } else {
+                header.type = header.type - WRITE_ROWS_COMPRESSED_EVENT_V1 + WRITE_ROWS_EVENT_V1;
+            }
+        }
         rowsBuf = buffer.duplicate(dataSize);
     }
 

+ 8 - 3
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/UpdateRowsLogEvent.java

@@ -11,14 +11,19 @@ import com.taobao.tddl.dbsync.binlog.LogBuffer;
  * @author <a href="mailto:changyuan.lh@taobao.com">Changyuan.lh</a>
  * @version 1.0
  */
-public final class UpdateRowsLogEvent extends RowsLogEvent {
+public class UpdateRowsLogEvent extends RowsLogEvent {
 
     public UpdateRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
-        super(header, buffer, descriptionEvent, false);
+        super(header, buffer, descriptionEvent, false , false);
     }
 
     public UpdateRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
                               boolean partial){
-        super(header, buffer, descriptionEvent, partial);
+        super(header, buffer, descriptionEvent, partial ,false);
+    }
+
+    public UpdateRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
+                              boolean partial , boolean compress){
+        super(header, buffer, descriptionEvent, partial , compress);
     }
 }

+ 7 - 2
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/WriteRowsLogEvent.java

@@ -9,9 +9,14 @@ import com.taobao.tddl.dbsync.binlog.LogBuffer;
  * @author <a href="mailto:changyuan.lh@taobao.com">Changyuan.lh</a>
  * @version 1.0
  */
-public final class WriteRowsLogEvent extends RowsLogEvent {
+public class WriteRowsLogEvent extends RowsLogEvent {
 
     public WriteRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
-        super(header, buffer, descriptionEvent);
+        super(header, buffer, descriptionEvent, false, false);
+    }
+
+    public WriteRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
+                             boolean compress){
+        super(header, buffer, descriptionEvent, false, compress);
     }
 }

+ 12 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/BinlogCheckPointLogEvent.java

@@ -13,9 +13,20 @@ import com.taobao.tddl.dbsync.binlog.event.LogHeader;
  */
 public class BinlogCheckPointLogEvent extends IgnorableLogEvent {
 
+    private final String filename;
+
     public BinlogCheckPointLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
         super(header, buffer, descriptionEvent);
-        // do nothing , just mariadb binlog checkpoint
+        // mariadb binlog checkpoint
+        final int headerSize = descriptionEvent.getCommonHeaderLen();
+        final int postHeaderLen = descriptionEvent.getPostHeaderLen()[getHeader().getType() - 1];
+
+        buffer.position(headerSize);
+        long binlogFileLen = buffer.getUint32();
+        filename = buffer.getFixString((int) binlogFileLen);
     }
 
+    public String getFilename() {
+        return filename;
+    }
 }

+ 1 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/DeleteRowsCompressLogEvent.java

@@ -0,0 +1 @@
+package com.taobao.tddl.dbsync.binlog.event.mariadb;

import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
import com.taobao.tddl.dbsync.binlog.event.LogHeader;

/**
 * mariadb compress rows event
 * 
 * @author jianghang
 * @since 1.1.7
 */
public class DeleteRowsCompressLogEvent extends DeleteRowsLogEvent {

    public DeleteRowsCompressLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
        super(header, buffer, descriptionEvent, true);
    }
}

+ 1 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/QueryCompressedLogEvent.java

@@ -0,0 +1 @@
+package com.taobao.tddl.dbsync.binlog.event.mariadb;

import java.io.IOException;

import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
import com.taobao.tddl.dbsync.binlog.event.LogHeader;
import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;

/**
 * mariadb compress query event
 * 
 * @author jianghang
 * @since 1.1.7
 */
public class QueryCompressedLogEvent extends QueryLogEvent {

    public QueryCompressedLogEvent(LogHeader header, LogBuffer buffer,
                                   FormatDescriptionLogEvent descriptionEvent) throws IOException{
        super(header, buffer, descriptionEvent, true);
    }
}

+ 1 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/UpdateRowsCompressLogEvent.java

@@ -0,0 +1 @@
+package com.taobao.tddl.dbsync.binlog.event.mariadb;

import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
import com.taobao.tddl.dbsync.binlog.event.LogHeader;
import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;

/**
 * mariadb compress rows event
 * 
 * @author jianghang
 * @since 1.1.7
 */
public class UpdateRowsCompressLogEvent extends UpdateRowsLogEvent {

    public UpdateRowsCompressLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
        super(header, buffer, descriptionEvent, false, true);
    }
}

+ 1 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/WriteRowsCompressLogEvent.java

@@ -0,0 +1 @@
+package com.taobao.tddl.dbsync.binlog.event.mariadb;

import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
import com.taobao.tddl.dbsync.binlog.event.LogHeader;
import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;

/**
 * mariadb compress rows event
 * 
 * @author jianghang
 * @since 1.1.7
 */
public class WriteRowsCompressLogEvent extends WriteRowsLogEvent {

    public WriteRowsCompressLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
        super(header, buffer, descriptionEvent,  true);
    }
}

+ 6 - 2
dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java

@@ -6,6 +6,7 @@ import java.sql.DriverManager;
 import java.sql.Statement;
 import java.util.List;
 
+import com.taobao.tddl.dbsync.binlog.event.mariadb.BinlogCheckPointLogEvent;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -19,12 +20,12 @@ public class DirectLogFetcherTest extends BaseLogFetcherTest {
         DirectLogFetcher fecther = new DirectLogFetcher();
         try {
             Class.forName("com.mysql.jdbc.Driver");
-            Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", "root", "hello");
+            Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", "root", "123456");
             Statement statement = connection.createStatement();
             statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'");
             statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");
 
-            fecther.open(connection, "binlog.000002", 4L, 1);
+            fecther.open(connection, "mysql-bin.000002", 4L, 1);
 
             LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();
@@ -51,6 +52,9 @@ public class DirectLogFetcherTest extends BaseLogFetcherTest {
             case LogEvent.ROTATE_EVENT:
                 binlogFileName = ((RotateLogEvent) event).getFilename();
                 break;
+            case LogEvent.BINLOG_CHECKPOINT_EVENT:
+                binlogFileName = ((BinlogCheckPointLogEvent) event).getFilename();
+                break;
             case LogEvent.WRITE_ROWS_EVENT_V1:
             case LogEvent.WRITE_ROWS_EVENT:
                 parseRowsEvent((WriteRowsLogEvent) event);

+ 45 - 36
dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/FileLogFetcherTest.java

@@ -3,6 +3,7 @@ package com.taobao.tddl.dbsync.binlog;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.util.List;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -43,43 +44,9 @@ public class FileLogFetcherTest extends BaseLogFetcherTest {
             while (fetcher.fetch()) {
                 LogEvent event = null;
                 event = decoder.decode(fetcher, context);
-                if (event != null) {
-                    int eventType = event.getHeader().getType();
-                    switch (eventType) {
-                        case LogEvent.ROTATE_EVENT:
-                            binlogFileName = ((RotateLogEvent) event).getFilename();
-                            break;
-                        case LogEvent.WRITE_ROWS_EVENT_V1:
-                        case LogEvent.WRITE_ROWS_EVENT:
-                            parseRowsEvent((WriteRowsLogEvent) event);
-                            break;
-                        case LogEvent.UPDATE_ROWS_EVENT_V1:
-                        case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
-                        case LogEvent.UPDATE_ROWS_EVENT:
-                            parseRowsEvent((UpdateRowsLogEvent) event);
-                            break;
-                        case LogEvent.DELETE_ROWS_EVENT_V1:
-                        case LogEvent.DELETE_ROWS_EVENT:
-                            parseRowsEvent((DeleteRowsLogEvent) event);
-                            break;
-                        case LogEvent.QUERY_EVENT:
-                            parseQueryEvent((QueryLogEvent) event);
-                            break;
-                        case LogEvent.ROWS_QUERY_LOG_EVENT:
-                            parseRowsQueryEvent((RowsQueryLogEvent) event);
-                            break;
-                        case LogEvent.ANNOTATE_ROWS_EVENT:
-                            parseAnnotateRowsEvent((AnnotateRowsEvent) event);
-                            break;
-                        case LogEvent.XID_EVENT:
-                            parseXidEvent((XidLogEvent) event);
-                            break;
-                        default:
-                            break;
-                    }
-                }
+                processEvent(event, decoder, context);
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             Assert.fail(e.getMessage());
         } finally {
             try {
@@ -89,4 +56,46 @@ public class FileLogFetcherTest extends BaseLogFetcherTest {
             }
         }
     }
+
+    public void processEvent(LogEvent event, LogDecoder decoder, LogContext context) throws Throwable {
+        int eventType = event.getHeader().getType();
+        switch (eventType) {
+            case LogEvent.ROTATE_EVENT:
+                binlogFileName = ((RotateLogEvent) event).getFilename();
+                break;
+            case LogEvent.WRITE_ROWS_EVENT_V1:
+            case LogEvent.WRITE_ROWS_EVENT:
+                parseRowsEvent((WriteRowsLogEvent) event);
+                break;
+            case LogEvent.UPDATE_ROWS_EVENT_V1:
+            case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
+            case LogEvent.UPDATE_ROWS_EVENT:
+                parseRowsEvent((UpdateRowsLogEvent) event);
+                break;
+            case LogEvent.DELETE_ROWS_EVENT_V1:
+            case LogEvent.DELETE_ROWS_EVENT:
+                parseRowsEvent((DeleteRowsLogEvent) event);
+                break;
+            case LogEvent.QUERY_EVENT:
+                parseQueryEvent((QueryLogEvent) event);
+                break;
+            case LogEvent.ROWS_QUERY_LOG_EVENT:
+                parseRowsQueryEvent((RowsQueryLogEvent) event);
+                break;
+            case LogEvent.ANNOTATE_ROWS_EVENT:
+                parseAnnotateRowsEvent((AnnotateRowsEvent) event);
+                break;
+            case LogEvent.XID_EVENT:
+                parseXidEvent((XidLogEvent) event);
+                break;
+            case LogEvent.TRANSACTION_PAYLOAD_EVENT:
+                List<LogEvent> events = decoder.processIterateDecode(event, context);
+                for (LogEvent deEvent : events) {
+                    processEvent(deEvent, decoder, context);
+                }
+                break;
+            default:
+                break;
+        }
+    }
 }

+ 14 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -114,17 +114,26 @@ public class LocalBinLogConnection implements ErosaConnection {
                         continue;
                     }
                     checkServerId(event);
-
-                    if (!func.sink(event)) {
-                        needContinue = false;
-                        break;
+                    List<LogEvent> iterateEvents = decoder.processIterateDecode(event, context);
+                    if (!iterateEvents.isEmpty()) {
+                        // 处理compress event
+                        for(LogEvent itEvent : iterateEvents) {
+                            if (!func.sink(event)) {
+                                needContinue = false;
+                                break;
+                            }
+                        }
+                    } else {
+                        if (!func.sink(event)) {
+                            needContinue = false;
+                            break;
+                        }
                     }
                 }
 
                 fetcher.close(); // 关闭上一个文件
                 parserFinish(current.getName());
                 if (needContinue) {// 读取下一个
-
                     File nextFile;
                     if (needWait) {
                         nextFile = binlogs.waitForNextFile(current);

+ 12 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -213,8 +213,18 @@ public class MysqlConnection implements ErosaConnection {
                     throw new CanalParseException("parse failed");
                 }
 
-                if (!func.sink(event)) {
-                    break;
+                List<LogEvent> iterateEvents = decoder.processIterateDecode(event, context);
+                if (!iterateEvents.isEmpty()) {
+                    // 处理compress event
+                    for(LogEvent itEvent : iterateEvents) {
+                        if (!func.sink(event)) {
+                            break;
+                        }
+                    }
+                } else {
+                    if (!func.sink(event)) {
+                        break;
+                    }
                 }
             }
         }

+ 149 - 65
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -1,11 +1,14 @@
 package com.alibaba.otter.canal.parse.inbound.mysql;
 
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.LockSupport;
 
+import org.apache.commons.compress.utils.Lists;
+
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
@@ -16,27 +19,12 @@ import com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor;
 import com.alibaba.otter.canal.parse.inbound.TableMeta;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
 import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.ExceptionHandler;
-import com.lmax.disruptor.InsufficientCapacityException;
-import com.lmax.disruptor.LifecycleAware;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.Sequence;
-import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.WorkHandler;
-import com.lmax.disruptor.WorkerPool;
+import com.lmax.disruptor.*;
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
-import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.*;
 
 /**
  * 针对解析器提供一个多阶段协同的处理
@@ -271,48 +259,71 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
                 }
 
                 int eventType = logEvent.getHeader().getType();
-                TableMeta tableMeta = null;
-                boolean needDmlParse = false;
-                switch (eventType) {
-                    case LogEvent.WRITE_ROWS_EVENT_V1:
-                    case LogEvent.WRITE_ROWS_EVENT:
-                        if (!filterDmlInsert) {
-                            tableMeta = logEventConvert.parseRowsEventForTableMeta((WriteRowsLogEvent) logEvent);
-                            needDmlParse = true;
-                        }
-                        break;
-                    case LogEvent.UPDATE_ROWS_EVENT_V1:
-                    case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
-                    case LogEvent.UPDATE_ROWS_EVENT:
-                        if (!filterDmlUpdate) {
-                            tableMeta = logEventConvert.parseRowsEventForTableMeta((UpdateRowsLogEvent) logEvent);
-                            needDmlParse = true;
-                        }
-                        break;
-                    case LogEvent.DELETE_ROWS_EVENT_V1:
-                    case LogEvent.DELETE_ROWS_EVENT:
-                        if (!filterDmlDelete) {
-                            tableMeta = logEventConvert.parseRowsEventForTableMeta((DeleteRowsLogEvent) logEvent);
-                            needDmlParse = true;
-                        }
-                        break;
-                    case LogEvent.ROWS_QUERY_LOG_EVENT:
-                        needDmlParse = true;
-                        break;
-                    default:
-                        CanalEntry.Entry entry = logEventConvert.parse(event.getEvent(), false);
-                        event.setEntry(entry);
+                boolean needIterate = false;
+
+                if (eventType == LogEvent.TRANSACTION_PAYLOAD_EVENT) {
+                    // https://github.com/alibaba/canal/issues/4388
+                    List<LogEvent> deLogEvents = decoder.processIterateDecode(logEvent, context);
+                    List<TableMeta> tableMetas = Lists.newArrayList();
+                    event.setNeedIterate(true);
+                    for (LogEvent deLogEvent : deLogEvents) {
+                        TableMeta table = processEvent(deLogEvent, event);
+                        tableMetas.add(table);
+                    }
+                    event.setIterateEvents(deLogEvents);
+                    event.setIterateTables(tableMetas);
+                } else {
+                    TableMeta table = processEvent(logEvent, event);
+                    event.setTable(table);
                 }
-
-                // 记录一下DML的表结构
-                event.setNeedDmlParse(needDmlParse);
-                event.setTable(tableMeta);
             } catch (Throwable e) {
                 exception = new CanalParseException(e);
                 throw exception;
             }
         }
 
+        private TableMeta processEvent(LogEvent logEvent, MessageEvent event) {
+            TableMeta tableMeta = null;
+            boolean needDmlParse = false;
+            int eventType = logEvent.getHeader().getType();
+            switch (eventType) {
+                case LogEvent.WRITE_ROWS_EVENT_V1:
+                case LogEvent.WRITE_ROWS_EVENT:
+                    if (!filterDmlInsert) {
+                        tableMeta = logEventConvert.parseRowsEventForTableMeta((WriteRowsLogEvent) logEvent);
+                        needDmlParse = true;
+                    }
+                    break;
+                case LogEvent.UPDATE_ROWS_EVENT_V1:
+                case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
+                case LogEvent.UPDATE_ROWS_EVENT:
+                    if (!filterDmlUpdate) {
+                        tableMeta = logEventConvert.parseRowsEventForTableMeta((UpdateRowsLogEvent) logEvent);
+                        needDmlParse = true;
+                    }
+                    break;
+                case LogEvent.DELETE_ROWS_EVENT_V1:
+                case LogEvent.DELETE_ROWS_EVENT:
+                    if (!filterDmlDelete) {
+                        tableMeta = logEventConvert.parseRowsEventForTableMeta((DeleteRowsLogEvent) logEvent);
+                        needDmlParse = true;
+                    }
+                    break;
+                case LogEvent.ROWS_QUERY_LOG_EVENT:
+                    needDmlParse = true;
+                    break;
+                default:
+                    CanalEntry.Entry entry = logEventConvert.parse(event.getEvent(), false);
+                    event.setEntry(entry);
+            }
+
+            // 记录一下DML的表结构
+            if (needDmlParse && !event.isNeedDmlParse()) {
+                event.setNeedDmlParse(true);
+            }
+            return tableMeta;
+        }
+
         @Override
         public void onStart() {
 
@@ -330,18 +341,21 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         public void onEvent(MessageEvent event) throws Exception {
             try {
                 if (event.isNeedDmlParse()) {
-                    int eventType = event.getEvent().getHeader().getType();
-                    CanalEntry.Entry entry = null;
-                    switch (eventType) {
-                        case LogEvent.ROWS_QUERY_LOG_EVENT:
-                            entry = logEventConvert.parse(event.getEvent(), false);
-                            break;
-                        default:
-                            // 单独解析dml事件
-                            entry = logEventConvert.parseRowsEvent((RowsLogEvent) event.getEvent(), event.getTable());
+                    if (event.isNeedIterate()) {
+                        // compress binlog
+                        List<CanalEntry.Entry> entrys = Lists.newArrayList();
+                        for (int index = 0; index < event.getIterateEvents().size(); index++) {
+                            CanalEntry.Entry entry = processEvent(event.getIterateEvents().get(index),
+                                event.getIterateTables().get(index));
+                            if (entry != null) {
+                                entrys.add(entry);
+                            }
+                        }
+                        event.setIterateEntrys(entrys);
+                    } else {
+                        CanalEntry.Entry entry = processEvent(event.getEvent(), event.getTable());
+                        event.setEntry(entry);
                     }
-
-                    event.setEntry(entry);
                 }
             } catch (Throwable e) {
                 exception = new CanalParseException(e);
@@ -349,6 +363,29 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
             }
         }
 
+        private CanalEntry.Entry processEvent(LogEvent logEvent, TableMeta table) {
+            int eventType = logEvent.getHeader().getType();
+            CanalEntry.Entry entry = null;
+            switch (eventType) {
+                case LogEvent.WRITE_ROWS_EVENT_V1:
+                case LogEvent.WRITE_ROWS_EVENT:
+                case LogEvent.UPDATE_ROWS_EVENT_V1:
+                case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
+                case LogEvent.UPDATE_ROWS_EVENT:
+                case LogEvent.DELETE_ROWS_EVENT_V1:
+                case LogEvent.DELETE_ROWS_EVENT:
+                    // 单独解析dml事件
+                    entry = logEventConvert.parseRowsEvent((RowsLogEvent) logEvent, table);
+                    break;
+                default:
+                    // 如果出现compress binlog,会出现其他的event type类型
+                    entry = logEventConvert.parse(logEvent, false);
+                    break;
+            }
+
+            return entry;
+        }
+
         @Override
         public void onStart() {
 
@@ -364,8 +401,15 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throws Exception {
             try {
-                if (event.getEntry() != null) {
-                    transactionBuffer.add(event.getEntry());
+                if (event.isNeedIterate()) {
+                    // compress binlog
+                    for (CanalEntry.Entry entry : event.getIterateEntrys()) {
+                        transactionBuffer.add(entry);
+                    }
+                } else {
+                    if (event.getEntry() != null) {
+                        transactionBuffer.add(event.getEntry());
+                    }
                 }
 
                 LogEvent logEvent = event.getEvent();
@@ -380,7 +424,11 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
                 event.setEvent(null);
                 event.setTable(null);
                 event.setEntry(null);
+                // clear compress binlog events
                 event.setNeedDmlParse(false);
+                event.setIterateEntrys(null);
+                event.setIterateTables(null);
+                event.setIterateEvents(null);
             } catch (Throwable e) {
                 exception = new CanalParseException(e);
                 throw exception;
@@ -405,6 +453,11 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         private boolean          needDmlParse = false;
         private TableMeta        table;
         private LogEvent         event;
+        private boolean                needIterate  = false;
+        // compress binlog
+        private List<LogEvent>         iterateEvents;
+        private List<TableMeta>        iterateTables;
+        private List<CanalEntry.Entry> iterateEntrys;
 
         public LogBuffer getBuffer() {
             return buffer;
@@ -446,6 +499,37 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
             this.table = table;
         }
 
+        public boolean isNeedIterate() {
+            return needIterate;
+        }
+
+        public void setNeedIterate(boolean needIterate) {
+            this.needIterate = needIterate;
+        }
+
+        public List<LogEvent> getIterateEvents() {
+            return iterateEvents;
+        }
+
+        public List<TableMeta> getIterateTables() {
+            return iterateTables;
+        }
+
+        public void setIterateEvents(List<LogEvent> iterateEvents) {
+            this.iterateEvents = iterateEvents;
+        }
+
+        public void setIterateTables(List<TableMeta> iterateTables) {
+            this.iterateTables = iterateTables;
+        }
+
+        public List<CanalEntry.Entry> getIterateEntrys() {
+            return iterateEntrys;
+        }
+
+        public void setIterateEntrys(List<CanalEntry.Entry> iterateEntrys) {
+            this.iterateEntrys = iterateEntrys;
+        }
     }
 
     static class SimpleFatalExceptionHandler implements ExceptionHandler {

+ 55 - 50
parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java

@@ -32,19 +32,10 @@ import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
-import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer;
-import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.*;
 import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent.ColumnInfo;
-import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
+import com.taobao.tddl.dbsync.binlog.event.mariadb.BinlogCheckPointLogEvent;
 
 @Ignore
 public class DirectLogFetcherTest {
@@ -63,7 +54,7 @@ public class DirectLogFetcherTest {
             updateSettings(connector);
             loadBinlogChecksum(connector);
             sendRegisterSlave(connector, 3);
-            sendBinlogDump(connector, "mysql-bin.000001", 4L, 3);
+            sendBinlogDump(connector, "mysql-bin.000002", 4L, 3);
 
             fetcher.start(connector.getChannel());
 
@@ -78,45 +69,9 @@ public class DirectLogFetcherTest {
                 if (event == null) {
                     throw new RuntimeException("parse failed");
                 }
-
-                int eventType = event.getHeader().getType();
-                switch (eventType) {
-                    case LogEvent.ROTATE_EVENT:
-                        // binlogFileName = ((RotateLogEvent)
-                        // event).getFilename();
-                        System.out.println(((RotateLogEvent) event).getFilename());
-                        break;
-                    case LogEvent.TABLE_MAP_EVENT:
-                        parseTableMapEvent((TableMapLogEvent) event);
-                        break;
-                    case LogEvent.WRITE_ROWS_EVENT_V1:
-                    case LogEvent.WRITE_ROWS_EVENT:
-                        parseRowsEvent((WriteRowsLogEvent) event);
-                        break;
-                    case LogEvent.UPDATE_ROWS_EVENT_V1:
-                    case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
-                    case LogEvent.UPDATE_ROWS_EVENT:
-                        parseRowsEvent((UpdateRowsLogEvent) event);
-                        break;
-                    case LogEvent.DELETE_ROWS_EVENT_V1:
-                    case LogEvent.DELETE_ROWS_EVENT:
-                        parseRowsEvent((DeleteRowsLogEvent) event);
-                        break;
-                    case LogEvent.QUERY_EVENT:
-                        parseQueryEvent((QueryLogEvent) event);
-                        break;
-                    case LogEvent.ROWS_QUERY_LOG_EVENT:
-                        parseRowsQueryEvent((RowsQueryLogEvent) event);
-                        break;
-                    case LogEvent.ANNOTATE_ROWS_EVENT:
-                        break;
-                    case LogEvent.XID_EVENT:
-                        break;
-                    default:
-                        break;
-                }
+                processEvent(event, decoder, context);
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
         } finally {
@@ -129,6 +84,56 @@ public class DirectLogFetcherTest {
 
     }
 
+    private void processEvent(LogEvent event, LogDecoder decoder, LogContext context) throws Throwable {
+        int eventType = event.getHeader().getType();
+        switch (eventType) {
+            case LogEvent.ROTATE_EVENT:
+                // binlogFileName = ((RotateLogEvent)
+                // event).getFilename();
+                System.out.println("RotateLogEvent : " + ((RotateLogEvent) event).getFilename());
+                break;
+            case LogEvent.BINLOG_CHECKPOINT_EVENT:
+                // binlogFileName = ((BinlogCheckPointLogEvent)
+                // event).getFilename();
+                System.out.println("BinlogCheckPointLogEvent : " + ((BinlogCheckPointLogEvent) event).getFilename());
+                break;
+            case LogEvent.TABLE_MAP_EVENT:
+                parseTableMapEvent((TableMapLogEvent) event);
+                break;
+            case LogEvent.WRITE_ROWS_EVENT_V1:
+            case LogEvent.WRITE_ROWS_EVENT:
+                parseRowsEvent((WriteRowsLogEvent) event);
+                break;
+            case LogEvent.UPDATE_ROWS_EVENT_V1:
+            case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
+            case LogEvent.UPDATE_ROWS_EVENT:
+                parseRowsEvent((UpdateRowsLogEvent) event);
+                break;
+            case LogEvent.DELETE_ROWS_EVENT_V1:
+            case LogEvent.DELETE_ROWS_EVENT:
+                parseRowsEvent((DeleteRowsLogEvent) event);
+                break;
+            case LogEvent.QUERY_EVENT:
+                parseQueryEvent((QueryLogEvent) event);
+                break;
+            case LogEvent.ROWS_QUERY_LOG_EVENT:
+                parseRowsQueryEvent((RowsQueryLogEvent) event);
+                break;
+            case LogEvent.ANNOTATE_ROWS_EVENT:
+                break;
+            case LogEvent.XID_EVENT:
+                break;
+            case LogEvent.TRANSACTION_PAYLOAD_EVENT:
+                List<LogEvent> events = decoder.processIterateDecode(event, context);
+                for (LogEvent deEvent : events) {
+                    processEvent(deEvent, decoder, context);
+                }
+                break;
+            default:
+                break;
+        }
+    }
+
     private void sendRegisterSlave(MysqlConnector connector, int slaveId) throws IOException {
         RegisterSlaveCommandPacket cmd = new RegisterSlaveCommandPacket();
         cmd.reportHost = connector.getAddress().getAddress().getHostAddress();

+ 2 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogDumpPerformanceTest.java

@@ -22,12 +22,12 @@ public class MysqlBinlogDumpPerformanceTest {
 
     public static void main(String args[]) {
         final MysqlEventParser controller = new MysqlEventParser();
-        final EntryPosition startPosition = new EntryPosition("mysql-bin.000007", 89796293L, 100L);
+        final EntryPosition startPosition = new EntryPosition("binlog.000002", 4L, 100L);
         controller.setConnectionCharset("UTF-8");
         controller.setSlaveId(3344L);
         controller.setDetectingEnable(false);
         controller.setFilterQueryDml(true);
-        controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("100.81.154.142", 3306), "canal", "canal"));
+        controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("127.0.0.1", 3306), "canal", "canal"));
         controller.setMasterPosition(startPosition);
         controller.setEnableTsdb(false);
         controller.setDestination("example");

+ 42 - 36
parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogParsePerformanceTest.java

@@ -5,10 +5,13 @@ import java.io.UnsupportedEncodingException;
 import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.util.BitSet;
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.junit.Ignore;
+
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
@@ -19,14 +22,8 @@ import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
-import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer;
-import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.*;
 import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent.ColumnInfo;
-import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
-import org.junit.Ignore;
 
 @Ignore
 public class MysqlBinlogParsePerformanceTest {
@@ -44,7 +41,7 @@ public class MysqlBinlogParsePerformanceTest {
             Thread thread = new Thread(() -> {
                 try {
                     consumer(buffer);
-                } catch (IOException | InterruptedException e) {
+                } catch (Throwable e) {
                     e.printStackTrace();
                 }
             });
@@ -59,7 +56,7 @@ public class MysqlBinlogParsePerformanceTest {
         }
     }
 
-    public static void consumer(BlockingQueue<LogBuffer> buffer) throws IOException, InterruptedException {
+    public static void consumer(BlockingQueue<LogBuffer> buffer) throws Throwable {
         LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
         LogContext context = new LogContext();
 
@@ -70,33 +67,7 @@ public class MysqlBinlogParsePerformanceTest {
         while (true) {
             LogEvent event = null;
             event = decoder.decode(buffer.take(), context);
-            int eventType = event.getHeader().getType();
-            switch (eventType) {
-                case LogEvent.ROTATE_EVENT:
-                    break;
-                case LogEvent.WRITE_ROWS_EVENT_V1:
-                case LogEvent.WRITE_ROWS_EVENT:
-                    parseRowsEvent((WriteRowsLogEvent) event, sum);
-                    break;
-                case LogEvent.UPDATE_ROWS_EVENT_V1:
-                case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
-                case LogEvent.UPDATE_ROWS_EVENT:
-                    parseRowsEvent((UpdateRowsLogEvent) event, sum);
-                    break;
-                case LogEvent.DELETE_ROWS_EVENT_V1:
-                case LogEvent.DELETE_ROWS_EVENT:
-                    parseRowsEvent((DeleteRowsLogEvent) event, sum);
-                    break;
-                case LogEvent.XID_EVENT:
-                    sum.incrementAndGet();
-                    break;
-                case LogEvent.QUERY_EVENT:
-                    sum.incrementAndGet();
-                    break;
-                default:
-                    break;
-            }
-
+            processEvent(event, decoder, context, sum);
             long current = sum.get();
             if (current - last >= 100000) {
                 end = System.currentTimeMillis();
@@ -108,6 +79,41 @@ public class MysqlBinlogParsePerformanceTest {
         }
     }
 
+    private static void processEvent(LogEvent event, LogDecoder decoder, LogContext context, AtomicLong sum) throws Throwable {
+        int eventType = event.getHeader().getType();
+        switch (eventType) {
+            case LogEvent.ROTATE_EVENT:
+                break;
+            case LogEvent.WRITE_ROWS_EVENT_V1:
+            case LogEvent.WRITE_ROWS_EVENT:
+                parseRowsEvent((WriteRowsLogEvent) event, sum);
+                break;
+            case LogEvent.UPDATE_ROWS_EVENT_V1:
+            case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
+            case LogEvent.UPDATE_ROWS_EVENT:
+                parseRowsEvent((UpdateRowsLogEvent) event, sum);
+                break;
+            case LogEvent.DELETE_ROWS_EVENT_V1:
+            case LogEvent.DELETE_ROWS_EVENT:
+                parseRowsEvent((DeleteRowsLogEvent) event, sum);
+                break;
+            case LogEvent.XID_EVENT:
+                sum.incrementAndGet();
+                break;
+            case LogEvent.QUERY_EVENT:
+                sum.incrementAndGet();
+                break;
+            case LogEvent.TRANSACTION_PAYLOAD_EVENT:
+                List<LogEvent> events = decoder.processIterateDecode(event, context);
+                for (LogEvent deEvent : events) {
+                    processEvent(deEvent, decoder, context, sum);
+                }
+                break;
+            default:
+                break;
+        }
+    }
+
     private static void sendBinlogDump(MysqlConnector connector, String binlogfilename, Long binlogPosition, int slaveId)
                                                                                                                          throws IOException {
         BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket();