Просмотр исходного кода

fixed issue #1070, support mysql8.0.13

七锋 6 лет назад
Родитель
Сommit
6ff57f53f8
19 измененных файлов с 1213 добавлено и 110 удалено
  1. 50 32
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonConversion.java
  2. 154 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonDiffConversion.java
  3. 3 1
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogBuffer.java
  4. 8 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java
  5. 9 1
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogEvent.java
  6. 1 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/FormatDescriptionLogEvent.java
  7. 83 18
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/QueryLogEvent.java
  8. 64 22
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java
  9. 23 2
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java
  10. 341 7
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/TableMapLogEvent.java
  11. 6 1
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/UpdateRowsLogEvent.java
  12. 1 0
      dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java
  13. 1 0
      dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/FileLogFetcherTest.java
  14. 258 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/CharsetUtil.java
  15. 51 0
      driver/src/test/java/com/alibaba/otter/canal/parse/driver/mysql/CharsetUtilTest.java
  16. 1 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java
  17. 38 18
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  18. 118 6
      parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java
  19. 3 2
      parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogParsePerformanceTest.java

+ 50 - 32
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonConversion.java

@@ -1,5 +1,9 @@
 package com.taobao.tddl.dbsync.binlog;
 
+import static com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer.appendNumber2;
+import static com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer.appendNumber4;
+import static com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer.usecondsToStr;
+
 /**
  * 处理下MySQL json二进制转化为可读的字符串
  * 
@@ -330,12 +334,30 @@ public class JsonConversion {
                             long ultime = Math.abs(packed_value);
                             long intpart = ultime >> 24;
                             int frac = (int) (ultime % (1L << 24));
-                            text = String.format("%s%02d:%02d:%02d",
-                                packed_value >= 0 ? "" : "-",
-                                (int) ((intpart >> 12) % (1 << 10)),
-                                (int) ((intpart >> 6) % (1 << 6)),
-                                (int) (intpart % (1 << 6)));
-                            text = text + "." + usecondsToStr(frac, 6);
+                            // text = String.format("%s%02d:%02d:%02d",
+                            // packed_value >= 0 ? "" : "-",
+                            // (int) ((intpart >> 12) % (1 << 10)),
+                            // (int) ((intpart >> 6) % (1 << 6)),
+                            // (int) (intpart % (1 << 6)));
+                            // text = text + "." + usecondsToStr(frac, 6);
+                            StringBuilder builder = new StringBuilder(17);
+                            if (packed_value < 0) {
+                                builder.append('-');
+                            }
+
+                            int d = (int) ((intpart >> 12) % (1 << 10));
+                            if (d > 100) {
+                                builder.append(String.valueOf(d));
+                            } else {
+                                appendNumber2(builder, d);
+                            }
+                            builder.append(':');
+                            appendNumber2(builder, (int) ((intpart >> 6) % (1 << 6)));
+                            builder.append(':');
+                            appendNumber2(builder, (int) (intpart % (1 << 6)));
+
+                            builder.append('.').append(usecondsToStr(frac, 6));
+                            text = builder.toString();
                         }
                         buf.append('"').append(text).append('"');
                     } else if (m_field_type == LogEvent.MYSQL_TYPE_DATE || m_field_type == LogEvent.MYSQL_TYPE_DATETIME
@@ -351,14 +373,28 @@ public class JsonConversion {
                             long ymd = intpart >> 17;
                             long ym = ymd >> 5;
                             long hms = intpart % (1 << 17);
-                            text = String.format("%04d-%02d-%02d %02d:%02d:%02d",
-                                (int) (ym / 13),
-                                (int) (ym % 13),
-                                (int) (ymd % (1 << 5)),
-                                (int) (hms >> 12),
-                                (int) ((hms >> 6) % (1 << 6)),
-                                (int) (hms % (1 << 6)));
-                            text = text + "." + usecondsToStr(frac, 6);
+                            // text =
+                            // String.format("%04d-%02d-%02d %02d:%02d:%02d",
+                            // (int) (ym / 13),
+                            // (int) (ym % 13),
+                            // (int) (ymd % (1 << 5)),
+                            // (int) (hms >> 12),
+                            // (int) ((hms >> 6) % (1 << 6)),
+                            // (int) (hms % (1 << 6)));
+                            StringBuilder builder = new StringBuilder(26);
+                            appendNumber4(builder, (int) (ym / 13));
+                            builder.append('-');
+                            appendNumber2(builder, (int) (ym % 13));
+                            builder.append('-');
+                            appendNumber2(builder, (int) (ymd % (1 << 5)));
+                            builder.append(' ');
+                            appendNumber2(builder, (int) (hms >> 12));
+                            builder.append(':');
+                            appendNumber2(builder, (int) ((hms >> 6) % (1 << 6)));
+                            builder.append(':');
+                            appendNumber2(builder, (int) (hms % (1 << 6)));
+                            builder.append('.').append(usecondsToStr(frac, 6));
+                            text = builder.toString();
                         }
                         buf.append('"').append(text).append('"');
                     } else {
@@ -397,22 +433,4 @@ public class JsonConversion {
         OBJECT, ARRAY, STRING, INT, UINT, DOUBLE, LITERAL_NULL, LITERAL_TRUE, LITERAL_FALSE, OPAQUE, ERROR
     }
 
-    private static String usecondsToStr(int frac, int meta) {
-        String sec = String.valueOf(frac);
-        if (meta > 6) {
-            throw new IllegalArgumentException("unknow useconds meta : " + meta);
-        }
-
-        if (sec.length() < 6) {
-            StringBuilder result = new StringBuilder(6);
-            int len = 6 - sec.length();
-            for (; len > 0; len--) {
-                result.append('0');
-            }
-            result.append(sec);
-            sec = result.toString();
-        }
-
-        return sec.substring(0, meta);
-    }
 }

+ 154 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonDiffConversion.java

@@ -0,0 +1,154 @@
+package com.taobao.tddl.dbsync.binlog;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.taobao.tddl.dbsync.binlog.JsonConversion.Json_Value;
+import com.taobao.tddl.dbsync.binlog.JsonConversion.Json_enum_type;
+
+/**
+ * 处理mysql8.0 parital json diff解析
+ * 
+ * @author agapple 2018年11月4日 下午3:53:46
+ * @since 1.1.2
+ */
+public class JsonDiffConversion {
+
+    /**
+     * The JSON value in the given path is replaced with a new value. It has the
+     * same effect as `JSON_REPLACE(col, path, value)`.
+     */
+    public static final int DIFF_OPERATION_REPLACE    = 0;
+    /**
+     * Add a new element at the given path. If the path specifies an array
+     * element, it has the same effect as `JSON_ARRAY_INSERT(col, path, value)`.
+     * If the path specifies an object member, it has the same effect as
+     * `JSON_INSERT(col, path, value)`.
+     */
+    public static final int DIFF_OPERATION_INSERT     = 1;
+    /**
+     * The JSON value at the given path is removed from an array or object. It
+     * has the same effect as `JSON_REMOVE(col, path)`.
+     */
+    public static final int DIFF_OPERATION_REMOVE     = 2;
+
+    public static final int JSON_DIFF_OPERATION_COUNT = 3;
+
+    public static StringBuilder print_json_diff(LogBuffer buffer, long len, String columnName, int columnIndex,
+                                                String charsetName) {
+        int position = buffer.position();
+        List<String> operation_names = new ArrayList<String>();
+        while (buffer.hasRemaining()) {
+            int operation_int = buffer.getUint8();
+            if (operation_int >= JSON_DIFF_OPERATION_COUNT) {
+                throw new IllegalArgumentException("reading operation type (invalid operation code)");
+            }
+
+            // skip path
+            long path_length = buffer.getPackedLong();
+            if (path_length > len) {
+                throw new IllegalArgumentException("skipping path");
+            }
+
+            // compute operation name
+            byte[] lastP = buffer.getData(buffer.position() + (int) path_length - 1, 1);
+            String operation_name = json_diff_operation_name(operation_int, lastP[0]);
+            operation_names.add(operation_name);
+
+            buffer.forward((int) path_length);
+            // skip value
+            if (operation_int != DIFF_OPERATION_REMOVE) {
+                long value_length = buffer.getPackedLong();
+                if (value_length > len) {
+                    throw new IllegalArgumentException("skipping path");
+                }
+
+                buffer.forward((int) value_length);
+            }
+        }
+
+        // Print function names in reverse order.
+        StringBuilder builder = new StringBuilder();
+        for (int i = operation_names.size() - 1; i >= 0; i--) {
+            if (i == 0 || operation_names.get(i - 1) != operation_names.get(i)) {
+                builder.append(operation_names.get(i)).append("(");
+            }
+        }
+
+        // Print column id
+        if (columnName != null) {
+            builder.append(columnName);
+        } else {
+            builder.append("@").append(columnIndex);
+        }
+
+        // In case this vector is empty (a no-op), make an early return
+        // after printing only the column name
+        if (operation_names.size() == 0) {
+            return builder;
+        }
+
+        // Print comma between column name and next function argument
+        builder.append(", ");
+        // Print paths and values.
+        buffer.position(position);
+        int diff_i = 0;
+        while (buffer.hasRemaining()) {
+            // Read operation
+            int operation_int = buffer.getUint8();
+
+            // Read path length
+            long path_length = buffer.getPackedLong();
+            // Print path
+            builder.append('\'').append(buffer.getFixString((int) path_length)).append('\'');
+
+            if (operation_int != DIFF_OPERATION_REMOVE) {
+                // Print comma between path and value
+                builder.append(", ");
+                // Read value length
+                long value_length = buffer.getPackedLong();
+
+                Json_Value jsonValue = JsonConversion.parse_value(buffer.getUint8(),
+                    buffer,
+                    value_length - 1,
+                    charsetName);
+                buffer.forward((int) value_length - 1);
+                // Read value
+                if (jsonValue.m_type == Json_enum_type.ERROR) {
+                    throw new IllegalArgumentException("parsing json value");
+                }
+                StringBuilder jsonBuilder = new StringBuilder();
+                jsonValue.toJsonString(jsonBuilder, charsetName);
+                builder.append(jsonBuilder);
+            }
+
+            // Print closing parenthesis
+            if (!buffer.hasRemaining() || operation_names.get(diff_i + 1) != operation_names.get(diff_i)) {
+                builder.append(")");
+            }
+
+            if (buffer.hasRemaining()) {
+                builder.append(", ");
+            }
+            diff_i++;
+        }
+
+        return builder;
+    }
+
+    private static String json_diff_operation_name(int operationType, int last_path_char) {
+        switch (operationType) {
+            case DIFF_OPERATION_REPLACE:
+                return "JSON_REPLACE";
+            case DIFF_OPERATION_INSERT:
+                if (last_path_char == ']') {
+                    return "JSON_ARRAY_INSERT";
+                } else {
+                    return "JSON_INSERT";
+                }
+            case DIFF_OPERATION_REMOVE:
+                return "JSON_REMOVE";
+        }
+        throw new IllegalArgumentException("illeagal operationType : " + operationType);
+    }
+}

+ 3 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogBuffer.java

@@ -1484,7 +1484,9 @@ public class LogBuffer {
 
         for (int bit = 0; bit < len; bit += 8) {
             int flag = ((int) buf[pos++]) & 0xff;
-            if (flag == 0) continue;
+            if (flag == 0) {
+                continue;
+            }
             if ((flag & 0x01) != 0) bitmap.set(bit);
             if ((flag & 0x02) != 0) bitmap.set(bit + 1);
             if ((flag & 0x04) != 0) bitmap.set(bit + 2);

+ 8 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -372,6 +372,14 @@ public final class LogDecoder {
                 header.putGtid(context.getGtidSet(), gtidLogEvent);
                 return event;
             }
+            case LogEvent.PARTIAL_UPDATE_ROWS_EVENT: {
+                RowsLogEvent event = new UpdateRowsLogEvent(header, buffer, descriptionEvent, true);
+                /* updating position in context */
+                logPosition.position = header.getLogPos();
+                event.fillTable(context);
+                header.putGtid(context.getGtidSet(), gtidLogEvent);
+                return event;
+            }
             case LogEvent.GTID_LOG_EVENT:
             case LogEvent.ANONYMOUS_GTID_LOG_EVENT: {
                 GtidLogEvent event = new GtidLogEvent(header, buffer, descriptionEvent);

+ 9 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogEvent.java

@@ -169,9 +169,15 @@ public abstract class LogEvent {
     /* Prepared XA transaction terminal event similar to Xid */
     public static final int    XA_PREPARE_LOG_EVENT                     = 38;
 
+    /**
+     * Extension of UPDATE_ROWS_EVENT, allowing partial values according to
+     * binlog_row_value_options.
+     */
+    public static final int    PARTIAL_UPDATE_ROWS_EVENT                = 39;
+
     // mariaDb 5.5.34
     /* New MySQL/Sun events are to be added right above this comment */
-    public static final int    MYSQL_EVENTS_END                         = 39;
+    public static final int    MYSQL_EVENTS_END                         = 49;
 
     public static final int    MARIA_EVENTS_BEGIN                       = 160;
     /* New Maria event numbers start from here */
@@ -361,6 +367,8 @@ public abstract class LogEvent {
                 return "Anonymous_Gtid";
             case PREVIOUS_GTIDS_LOG_EVENT:
                 return "Previous_gtids";
+            case PARTIAL_UPDATE_ROWS_EVENT:
+                return "Update_rows_partial";
             default:
                 return "Unknown"; /* impossible */
         }

+ 1 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/FormatDescriptionLogEvent.java

@@ -211,6 +211,7 @@ public final class FormatDescriptionLogEvent extends StartLogEventV3 {
                 postHeaderLen[TRANSACTION_CONTEXT_EVENT - 1] = TRANSACTION_CONTEXT_HEADER_LEN;
                 postHeaderLen[VIEW_CHANGE_EVENT - 1] = VIEW_CHANGE_HEADER_LEN;
                 postHeaderLen[XA_PREPARE_LOG_EVENT - 1] = XA_PREPARE_HEADER_LEN;
+                postHeaderLen[PARTIAL_UPDATE_ROWS_EVENT - 1] = ROWS_HEADER_LEN_V2;
 
                 // mariadb 10
                 postHeaderLen[ANNOTATE_ROWS_EVENT - 1] = ANNOTATE_ROWS_HEADER_LEN;

+ 83 - 18
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/QueryLogEvent.java

@@ -1,6 +1,7 @@
 package com.taobao.tddl.dbsync.binlog.event;
 
 import java.io.IOException;
+import java.math.BigInteger;
 import java.nio.charset.Charset;
 
 import com.taobao.tddl.dbsync.binlog.CharsetConversion;
@@ -285,12 +286,12 @@ public class QueryLogEvent extends LogEvent {
                                                                   * type,
                                                                   * sql_mode
                                                                   */
-                                                         + 1 + 1 + 255 /*
-                                                                        * type,
-                                                                        * length
-                                                                        * ,
-                                                                        * catalog
-                                                                        */
+                                                         + 1 + 1 + 255/*
+                                                                       * type,
+                                                                       * length
+                                                                       * ,
+                                                                       * catalog
+                                                                       */
                                                          + 1 + 4 /*
                                                                   * type,
                                                                   * auto_increment
@@ -330,14 +331,32 @@ public class QueryLogEvent extends LogEvent {
                                                           * MariaDb type,
                                                           * sec_part of NOW()
                                                           */
-                                                         + 1 + (MAX_DBS_IN_EVENT_MTS * (1 + NAME_LEN)) + 3 + 1 + 32 * 3
-                                                         + 1 + 60/*
-                                                                  * type ,
-                                                                  * user_len ,
-                                                                  * user ,
-                                                                  * host_len ,
-                                                                  * host
-                                                                  */);
+                                                         + 1 + (MAX_DBS_IN_EVENT_MTS * (1 + NAME_LEN)) + 3 /*
+                                                                                                            * type
+                                                                                                            * ,
+                                                                                                            * microseconds
+                                                                                                            */+ 1 + 32
+                                                         * 3 + 1 + 60/*
+                                                                      * type ,
+                                                                      * user_len
+                                                                      * , user ,
+                                                                      * host_len
+                                                                      * , host
+                                                                      */)
+                                                        + 1 + 1 /*
+                                                                 * type,
+                                                                 * explicit_def
+                                                                 * ..ts
+                                                                 */+ 1 + 8 /*
+                                                                            * type,
+                                                                            * xid
+                                                                            * of
+                                                                            * DDL
+                                                                            */+ 1 + 2 /*
+                                                                                       * type
+                                                                                       * ,
+                                                                                       * default_collation_for_utf8mb4_number
+                                                                                       */+ 1 /* sql_require_primary_key */;
     /**
      * Fixed data part:
      * <ul>
@@ -394,7 +413,7 @@ public class QueryLogEvent extends LogEvent {
     // inspection by the DBA
     private final long      execTime;
     private final int       errorCode;
-    private final long      sessionId;                                                            /* thread_id */
+    private final long      sessionId;                                                                                     /* thread_id */
 
     /**
      * 'flags2' is a second set of flags (on top of those in Log_event), for
@@ -412,6 +431,8 @@ public class QueryLogEvent extends LogEvent {
     private int             clientCharset             = -1;
     private int             clientCollation           = -1;
     private int             serverCollation           = -1;
+    private int             tvSec                     = -1;
+    private BigInteger      ddlXid                    = BigInteger.valueOf(-1L);
     private String          charsetName;
 
     private String          timezone;
@@ -562,6 +583,21 @@ public class QueryLogEvent extends LogEvent {
      */
     public static final int Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP = 16;
 
+    /**
+     * The variable carries xid info of 2pc-aware (recoverable) DDL queries.
+     */
+    public static final int Q_DDL_LOGGED_WITH_XID             = 17;
+    /**
+     * This variable stores the default collation for the utf8mb4 character set.
+     * Used to support cross-version replication.
+     */
+    public static final int Q_DEFAULT_COLLATION_FOR_UTF8MB4   = 18;
+
+    /**
+     * Replicate sql_require_primary_key.
+     */
+    public static final int Q_SQL_REQUIRE_PRIMARY_KEY         = 19;
+
     /**
      * FROM MariaDB 5.5.34
      */
@@ -625,7 +661,7 @@ public class QueryLogEvent extends LogEvent {
                         break;
                     case Q_MICROSECONDS:
                         // when.tv_usec= uint3korr(pos);
-                        buffer.forward(3);
+                        tvSec = buffer.getInt24();
                         break;
                     case Q_UPDATED_DB_NAMES:
                         int mtsAccessedDbs = buffer.getUint8();
@@ -646,6 +682,19 @@ public class QueryLogEvent extends LogEvent {
                         }
                         break;
                     case Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP:
+                        // thd->variables.explicit_defaults_for_timestamp
+                        buffer.forward(1);
+                        break;
+                    case Q_DDL_LOGGED_WITH_XID:
+                        ddlXid = buffer.getUlong64();
+                        break;
+                    case Q_DEFAULT_COLLATION_FOR_UTF8MB4:
+                        // int2store(start,
+                        // default_collation_for_utf8mb4_number);
+                        buffer.forward(2);
+                        break;
+                    case Q_SQL_REQUIRE_PRIMARY_KEY:
+                        // *start++ = thd->variables.sql_require_primary_key;
                         buffer.forward(1);
                         break;
                     case Q_HRNOW:
@@ -657,8 +706,10 @@ public class QueryLogEvent extends LogEvent {
                          * That's why you must write status vars in growing
                          * order of code
                          */
-                        if (logger.isDebugEnabled()) logger.debug("Query_log_event has unknown status vars (first has code: "
-                                                                  + code + "), skipping the rest of them");
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Query_log_event has unknown status vars (first has code: " + code
+                                         + "), skipping the rest of them");
+                        }
                         break; // Break loop
                 }
             }
@@ -695,6 +746,12 @@ public class QueryLogEvent extends LogEvent {
                 return "Q_UPDATED_DB_NAMES";
             case Q_MICROSECONDS:
                 return "Q_MICROSECONDS";
+            case Q_DDL_LOGGED_WITH_XID:
+                return "Q_DDL_LOGGED_WITH_XID";
+            case Q_DEFAULT_COLLATION_FOR_UTF8MB4:
+                return "Q_DEFAULT_COLLATION_FOR_UTF8MB4";
+            case Q_SQL_REQUIRE_PRIMARY_KEY:
+                return "Q_SQL_REQUIRE_PRIMARY_KEY";
         }
         return "CODE#" + code;
     }
@@ -777,6 +834,14 @@ public class QueryLogEvent extends LogEvent {
         return serverCollation;
     }
 
+    public int getTvSec() {
+        return tvSec;
+    }
+
+    public BigInteger getDdlXid() {
+        return ddlXid;
+    }
+
     /**
      * Returns the sql_mode value.
      * <p>

+ 64 - 22
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java

@@ -10,6 +10,7 @@ import org.apache.commons.logging.LogFactory;
 
 import com.taobao.tddl.dbsync.binlog.JsonConversion;
 import com.taobao.tddl.dbsync.binlog.JsonConversion.Json_Value;
+import com.taobao.tddl.dbsync.binlog.JsonDiffConversion;
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 
@@ -31,22 +32,33 @@ public final class RowsLogBuffer {
 
     private final LogBuffer    buffer;
     private final int          columnLen;
+    private final int          jsonColumnCount;
     private final String       charsetName;
-    // private Calendar cal;
 
     private final BitSet       nullBits;
     private int                nullBitIndex;
 
+    // Read value_options if this is AI for PARTIAL_UPDATE_ROWS_EVENT
+    private final boolean      partial;
+    private final BitSet       partialBits;
+
     private boolean            fNull;
     private int                javaType;
     private int                length;
     private Serializable       value;
 
-    public RowsLogBuffer(LogBuffer buffer, final int columnLen, String charsetName){
+    public RowsLogBuffer(LogBuffer buffer, final int columnLen, String charsetName, int jsonColumnCount, boolean partial){
         this.buffer = buffer;
         this.columnLen = columnLen;
         this.charsetName = charsetName;
+        this.partial = partial;
+        this.jsonColumnCount = jsonColumnCount;
         this.nullBits = new BitSet(columnLen);
+        this.partialBits = new BitSet(1);
+    }
+
+    public final boolean nextOneRow(BitSet columns) {
+        return nextOneRow(columns, false);
     }
 
     /**
@@ -55,18 +67,30 @@ public final class RowsLogBuffer {
      * @see mysql-5.1.60/sql/log_event.cc -
      * Rows_log_event::print_verbose_one_row
      */
-    public final boolean nextOneRow(BitSet columns) {
+    public final boolean nextOneRow(BitSet columns, boolean after) {
         final boolean hasOneRow = buffer.hasRemaining();
 
         if (hasOneRow) {
             int column = 0;
 
             for (int i = 0; i < columnLen; i++)
-                if (columns.get(i)) column++;
+                if (columns.get(i)) {
+                    column++;
+                }
 
+            if (after && partial) {
+                partialBits.clear();
+                long valueOptions = buffer.getPackedLong();
+                int PARTIAL_JSON_UPDATES = 1;
+                if ((valueOptions & PARTIAL_JSON_UPDATES) != 0) {
+                    partialBits.set(1);
+                    buffer.forward((jsonColumnCount + 7) / 8);
+                }
+            }
             nullBitIndex = 0;
             nullBits.clear();
             buffer.fillBitmap(nullBits, column);
+
         }
         return hasOneRow;
     }
@@ -77,8 +101,8 @@ public final class RowsLogBuffer {
      * @see mysql-5.1.60/sql/log_event.cc -
      * Rows_log_event::print_verbose_one_row
      */
-    public final Serializable nextValue(final int type, final int meta) {
-        return nextValue(type, meta, false);
+    public final Serializable nextValue(final String columName, final int columnIndex, final int type, final int meta) {
+        return nextValue(columName, columnIndex, type, meta, false);
     }
 
     /**
@@ -87,7 +111,8 @@ public final class RowsLogBuffer {
      * @see mysql-5.1.60/sql/log_event.cc -
      * Rows_log_event::print_verbose_one_row
      */
-    public final Serializable nextValue(final int type, final int meta, boolean isBinary) {
+    public final Serializable nextValue(final String columName, final int columnIndex, final int type, final int meta,
+                                        boolean isBinary) {
         fNull = nullBits.get(nullBitIndex++);
 
         if (fNull) {
@@ -97,7 +122,7 @@ public final class RowsLogBuffer {
             return null;
         } else {
             // Extracting field value from packed buffer.
-            return fetchValue(type, meta, isBinary);
+            return fetchValue(columName, columnIndex, type, meta, isBinary);
         }
     }
 
@@ -248,7 +273,7 @@ public final class RowsLogBuffer {
      * 
      * @see mysql-5.1.60/sql/log_event.cc - log_event_print_value
      */
-    final Serializable fetchValue(int type, final int meta, boolean isBinary) {
+    final Serializable fetchValue(String columnName, int columnIndex, int type, final int meta, boolean isBinary) {
         int len = 0;
 
         if (type == LogEvent.MYSQL_TYPE_STRING) {
@@ -610,7 +635,7 @@ public final class RowsLogBuffer {
                     // (u32 % 10000) / 100,
                     // u32 % 100);
 
-                    StringBuilder builder = new StringBuilder(12);
+                    StringBuilder builder = new StringBuilder(17);
                     if (i32 < 0) {
                         builder.append('-');
                     }
@@ -1041,17 +1066,34 @@ public final class RowsLogBuffer {
                     default:
                         throw new IllegalArgumentException("!! Unknown JSON packlen = " + meta);
                 }
-                if (0 == len) {
-                    // fixed issue #1 by lava, json column of zero length has no
-                    // value, value parsing should be skipped
-                    value = "";
-                } else {
+
+                if (partialBits.get(1)) {
+                    // print_json_diff
                     int position = buffer.position();
-                    Json_Value jsonValue = JsonConversion.parse_value(buffer.getUint8(), buffer, len - 1, charsetName);
-                    StringBuilder builder = new StringBuilder();
-                    jsonValue.toJsonString(builder, charsetName);
+                    StringBuilder builder = JsonDiffConversion.print_json_diff(buffer,
+                        len,
+                        columnName,
+                        columnIndex,
+                        charsetName);
                     value = builder.toString();
                     buffer.position(position + len);
+                } else {
+                    if (0 == len) {
+                        // fixed issue #1 by lava, json column of zero length
+                        // has no
+                        // value, value parsing should be skipped
+                        value = "";
+                    } else {
+                        int position = buffer.position();
+                        Json_Value jsonValue = JsonConversion.parse_value(buffer.getUint8(),
+                            buffer,
+                            len - 1,
+                            charsetName);
+                        StringBuilder builder = new StringBuilder();
+                        jsonValue.toJsonString(builder, charsetName);
+                        value = builder.toString();
+                        buffer.position(position + len);
+                    }
                 }
                 javaType = Types.VARCHAR;
                 length = len;
@@ -1120,7 +1162,7 @@ public final class RowsLogBuffer {
         return length;
     }
 
-    private String usecondsToStr(int frac, int meta) {
+    public static String usecondsToStr(int frac, int meta) {
         String sec = String.valueOf(frac);
         if (meta > 6) {
             throw new IllegalArgumentException("unknow useconds meta : " + meta);
@@ -1139,7 +1181,7 @@ public final class RowsLogBuffer {
         return sec.substring(0, meta);
     }
 
-    private void appendNumber4(StringBuilder builder, int d) {
+    public static void appendNumber4(StringBuilder builder, int d) {
         if (d >= 1000) {
             builder.append(digits[d / 1000])
                 .append(digits[(d / 100) % 10])
@@ -1151,7 +1193,7 @@ public final class RowsLogBuffer {
         }
     }
 
-    private void appendNumber3(StringBuilder builder, int d) {
+    public static void appendNumber3(StringBuilder builder, int d) {
         if (d >= 100) {
             builder.append(digits[d / 100]).append(digits[(d / 10) % 10]).append(digits[d % 10]);
         } else {
@@ -1160,7 +1202,7 @@ public final class RowsLogBuffer {
         }
     }
 
-    private void appendNumber2(StringBuilder builder, int d) {
+    public static void appendNumber2(StringBuilder builder, int d) {
         if (d >= 10) {
             builder.append(digits[(d / 10) % 10]).append(digits[d % 10]);
         } else {

+ 23 - 2
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java

@@ -5,6 +5,7 @@ import java.util.BitSet;
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
+import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent.ColumnInfo;
 
 /**
  * Common base class for all row-containing log events.
@@ -61,6 +62,7 @@ public abstract class RowsLogEvent extends LogEvent {
 
     /** Bitmap denoting columns available */
     protected final int      columnLen;
+    protected final boolean  partial;
     protected final BitSet   columns;
 
     /**
@@ -71,6 +73,8 @@ public abstract class RowsLogEvent extends LogEvent {
      */
     protected final BitSet   changeColumns;
 
+    protected int            jsonColumnCount         = 0;
+
     /** XXX: Don't handle buffer in another thread. */
     private final LogBuffer  rowsBuf;                           /*
                                                                   * The rows in
@@ -109,6 +113,10 @@ public abstract class RowsLogEvent extends LogEvent {
     public static final int  RW_V_EXTRAINFO_TAG      = 0;
 
     public RowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
+        this(header, buffer, descriptionEvent, false);
+    }
+
+    public RowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent, boolean partial){
         super(header);
 
         final int commonHeaderLen = descriptionEvent.commonHeaderLen;
@@ -153,9 +161,11 @@ public abstract class RowsLogEvent extends LogEvent {
 
         buffer.position(commonHeaderLen + postHeaderLen + headerLen);
         columnLen = (int) buffer.getPackedLong();
+        this.partial = partial;
         columns = buffer.getBitmap(columnLen);
 
-        if (header.type == UPDATE_ROWS_EVENT_V1 || header.type == UPDATE_ROWS_EVENT) {
+        if (header.type == UPDATE_ROWS_EVENT_V1 || header.type == UPDATE_ROWS_EVENT
+            || header.type == PARTIAL_UPDATE_ROWS_EVENT) {
             changeColumns = buffer.getBitmap(columnLen);
         } else {
             changeColumns = columns;
@@ -175,6 +185,17 @@ public abstract class RowsLogEvent extends LogEvent {
             // delete original table map events stored in the map).
             context.clearAllTables();
         }
+
+        int jsonColumnCount = 0;
+        int columnCnt = table.getColumnCnt();
+        ColumnInfo[] columnInfo = table.getColumnInfo();
+        for (int i = 0; i < columnCnt; i++) {
+            ColumnInfo info = columnInfo[i];
+            if (info.type == LogEvent.MYSQL_TYPE_JSON) {
+                jsonColumnCount++;
+            }
+        }
+        this.jsonColumnCount = jsonColumnCount;
     }
 
     public final long getTableId() {
@@ -194,7 +215,7 @@ public abstract class RowsLogEvent extends LogEvent {
     }
 
     public final RowsLogBuffer getRowsBuf(String charsetName) {
-        return new RowsLogBuffer(rowsBuf, columnLen, charsetName);
+        return new RowsLogBuffer(rowsBuf, columnLen, charsetName, jsonColumnCount, partial);
     }
 
     public final int getFlags(final int flags) {

+ 341 - 7
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/TableMapLogEvent.java

@@ -1,6 +1,8 @@
 package com.taobao.tddl.dbsync.binlog.event;
 
+import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.List;
 
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
@@ -88,6 +90,17 @@ import com.taobao.tddl.dbsync.binlog.LogEvent;
  * first byte, the ninth is in the least significant bit of the second byte, and
  * so on.</td>
  * </tr>
+ * <tr>
+ * <td>optional metadata fields</td>
+ * <td>optional metadata fields are stored in Type, Length, Value(TLV) format.
+ * Type takes 1 byte. Length is a packed integer value. Values takes Length
+ * bytes.</td>
+ * <td>There are some optional metadata defined. They are listed in the table
+ * 
+ * @ref Table_table_map_event_optional_metadata. Optional metadata fields follow
+ * null_bits. Whether binlogging an optional metadata is decided by the server.
+ * The order is not defined, so they can be binlogged in any order. </td>
+ * </tr>
  * </table>
  * The table below lists all column types, along with the numerical identifier
  * for it and the size and interpretation of meta-data used to describe the
@@ -284,7 +297,6 @@ import com.taobao.tddl.dbsync.binlog.LogEvent;
  * of the geometry: 1, 2, 3, or 4.</td>
  * </tr>
  * </table>
- * 
  * @author <a href="mailto:changyuan.lh@taobao.com">Changyuan.lh</a>
  * @version 1.0
  */
@@ -322,20 +334,63 @@ public final class TableMapLogEvent extends LogEvent {
      */
     public static final class ColumnInfo {
 
-        public int type;
-        public int meta;
+        public int          type;
+        public int          meta;
+        public String       name;
+        public boolean      unsigned;
+        public boolean      pk;
+        public List<String> set_enum_values;
+        public int          charset;        // 可以通过CharsetUtil进行转化
+        public int          geoType;
+        public boolean      nullable;
+
+        @Override
+        public String toString() {
+            return "ColumnInfo [type=" + type + ", meta=" + meta + ", name=" + name + ", unsigned=" + unsigned
+                   + ", pk=" + pk + ", set_enum_values=" + set_enum_values + ", charset=" + charset + ", geoType="
+                   + geoType + ", nullable=" + nullable + "]";
+        }
     }
 
     protected final int          columnCnt;
-    protected final ColumnInfo[] columnInfo;         // buffer for field
-                                                      // metadata
+    protected final ColumnInfo[] columnInfo;                     // buffer
+                                                                  // for
+                                                                  // field
+                                                                  // metadata
 
     protected final long         tableId;
     protected BitSet             nullBits;
 
     /** TM = "Table Map" */
-    public static final int      TM_MAPID_OFFSET = 0;
-    public static final int      TM_FLAGS_OFFSET = 6;
+    public static final int      TM_MAPID_OFFSET         = 0;
+    public static final int      TM_FLAGS_OFFSET         = 6;
+
+    // UNSIGNED flag of numeric columns
+    public static final int      SIGNEDNESS              = 1;
+    // Default character set of string columns
+    public static final int      DEFAULT_CHARSET         = 2;
+    // Character set of string columns
+    public static final int      COLUMN_CHARSET          = 3;
+    public static final int      COLUMN_NAME             = 4;
+    // String value of SET columns
+    public static final int      SET_STR_VALUE           = 5;
+    // String value of ENUM columns
+    public static final int      ENUM_STR_VALUE          = 6;
+    // Real type of geometry columns
+    public static final int      GEOMETRY_TYPE           = 7;
+    // Primary key without prefix
+    public static final int      SIMPLE_PRIMARY_KEY      = 8;
+    // Primary key with prefix
+    public static final int      PRIMARY_KEY_WITH_PREFIX = 9;
+
+    private int                  default_charset;
+    private boolean              existOptionalMetaData   = false;
+
+    private static final class Pair {
+
+        public int col_index;
+        public int col_charset;
+    }
 
     /**
      * Constructor used by slave to read the event from the binary log.
@@ -379,7 +434,91 @@ public final class TableMapLogEvent extends LogEvent {
             final int fieldSize = (int) buffer.getPackedLong();
             decodeFields(buffer, fieldSize);
             nullBits = buffer.getBitmap(columnCnt);
+
+            for (int i = 0; i < columnCnt; i++) {
+                if (nullBits.get(i)) {
+                    columnInfo[i].nullable = true;
+                }
+            }
+            /*
+             * After null_bits field, there are some new fields for extra
+             * metadata.
+             */
+            existOptionalMetaData = false;
+            List<TableMapLogEvent.Pair> defaultCharsetPairs = null;
+            List<Integer> columnCharsets = null;
+            while (buffer.hasRemaining()) {
+                // optional metadata fields
+                int type = buffer.getUint8();
+                int len = (int) buffer.getPackedLong();
+
+                switch (type) {
+                    case SIGNEDNESS:
+                        parse_signedness(buffer, len);
+                        break;
+                    case DEFAULT_CHARSET:
+                        defaultCharsetPairs = parse_default_charset(buffer, len);
+                        break;
+                    case COLUMN_CHARSET:
+                        columnCharsets = parse_column_charset(buffer, len);
+                        break;
+                    case COLUMN_NAME:
+                        // set @@global.binlog_row_metadata='FULL'
+                        // 主要是补充列名相关信息
+                        existOptionalMetaData = true;
+                        parse_column_name(buffer, len);
+                        break;
+                    case SET_STR_VALUE:
+                        parse_set_str_value(buffer, len, true);
+                        break;
+                    case ENUM_STR_VALUE:
+                        parse_set_str_value(buffer, len, false);
+                        break;
+                    case GEOMETRY_TYPE:
+                        parse_geometry_type(buffer, len);
+                        break;
+                    case SIMPLE_PRIMARY_KEY:
+                        parse_simple_pk(buffer, len);
+                        break;
+                    case PRIMARY_KEY_WITH_PREFIX:
+                        parse_pk_with_prefix(buffer, len);
+                        break;
+                    default:
+                        throw new IllegalArgumentException("unknow type : " + type);
+                }
+            }
+
+            if (existOptionalMetaData) {
+                int index = 0;
+                int char_col_index = 0;
+                for (int i = 0; i < columnCnt; i++) {
+                    int cs = -1;
+                    int type = getRealType(columnInfo[i].type, columnInfo[i].meta);
+                    if (is_character_type(type)) {
+                        if (defaultCharsetPairs != null && !defaultCharsetPairs.isEmpty()) {
+                            if (index < defaultCharsetPairs.size()
+                                && char_col_index == defaultCharsetPairs.get(index).col_index) {
+                                cs = defaultCharsetPairs.get(index).col_charset;
+                                index++;
+                            } else {
+                                cs = default_charset;
+                            }
+
+                            char_col_index++;
+                        } else if (columnCharsets != null) {
+                            cs = columnCharsets.get(index);
+                            index++;
+                        }
+
+                        columnInfo[i].charset = cs;
+                    }
+                }
+            }
         }
+
+        // for (int i = 0; i < columnCnt; i++) {
+        // System.out.println(columnInfo[i]);
+        // }
     }
 
     /**
@@ -459,6 +598,192 @@ public final class TableMapLogEvent extends LogEvent {
         buffer.limit(limit);
     }
 
+    private void parse_signedness(LogBuffer buffer, int length) {
+        // stores the signedness flags extracted from field
+        List<Boolean> datas = new ArrayList<Boolean>();
+        for (int i = 0; i < length; i++) {
+            int ut = buffer.getUint8();
+            for (int c = 0x80; c != 0; c >>= 1) {
+                datas.add((ut & c) > 0);
+            }
+        }
+
+        int index = 0;
+        for (int i = 0; i < columnCnt; i++) {
+            if (is_numeric_type(columnInfo[i].type)) {
+                columnInfo[i].unsigned = datas.get(index);
+                index++;
+            }
+        }
+    }
+
+    private List<TableMapLogEvent.Pair> parse_default_charset(LogBuffer buffer, int length) {
+        // stores collation numbers extracted from field.
+        int limit = buffer.position() + length;
+        this.default_charset = (int) buffer.getPackedLong();
+        List<TableMapLogEvent.Pair> datas = new ArrayList<TableMapLogEvent.Pair>();
+        while (buffer.hasRemaining() && buffer.position() < limit) {
+            int col_index = (int) buffer.getPackedLong();
+            int col_charset = (int) buffer.getPackedLong();
+
+            Pair pair = new Pair();
+            pair.col_index = col_index;
+            pair.col_charset = col_charset;
+            datas.add(pair);
+        }
+
+        return datas;
+    }
+
+    private List<Integer> parse_column_charset(LogBuffer buffer, int length) {
+        // stores collation numbers extracted from field.
+        int limit = buffer.position() + length;
+        List<Integer> datas = new ArrayList<Integer>();
+        while (buffer.hasRemaining() && buffer.position() < limit) {
+            int col_charset = (int) buffer.getPackedLong();
+            datas.add(col_charset);
+        }
+
+        return datas;
+    }
+
+    private void parse_column_name(LogBuffer buffer, int length) {
+        // stores column names extracted from field
+        int limit = buffer.position() + length;
+        int index = 0;
+        while (buffer.hasRemaining() && buffer.position() < limit) {
+            int len = (int) buffer.getPackedLong();
+            columnInfo[index++].name = buffer.getFixString(len);
+        }
+    }
+
+    private void parse_set_str_value(LogBuffer buffer, int length, boolean set) {
+        // stores SET/ENUM column's string values extracted from
+        // field. Each SET/ENUM column's string values are stored
+        // into a string separate vector. All of them are stored
+        // in 'vec'.
+        int limit = buffer.position() + length;
+        List<List<String>> datas = new ArrayList<List<String>>();
+        while (buffer.hasRemaining() && buffer.position() < limit) {
+            int count = (int) buffer.getPackedLong();
+            List<String> data = new ArrayList<String>(count);
+            for (int i = 0; i < count; i++) {
+                int len1 = (int) buffer.getPackedLong();
+                data.add(buffer.getFixString(len1));
+            }
+
+            datas.add(data);
+        }
+
+        int index = 0;
+        for (int i = 0; i < columnCnt; i++) {
+            if (set && getRealType(columnInfo[i].type, columnInfo[i].meta) == LogEvent.MYSQL_TYPE_SET) {
+                columnInfo[i].set_enum_values = datas.get(index);
+                index++;
+            }
+
+            if (!set && getRealType(columnInfo[i].type, columnInfo[i].meta) == LogEvent.MYSQL_TYPE_ENUM) {
+                columnInfo[i].set_enum_values = datas.get(index);
+                index++;
+            }
+        }
+    }
+
+    private void parse_geometry_type(LogBuffer buffer, int length) {
+        // stores geometry column's types extracted from field.
+        int limit = buffer.position() + length;
+
+        List<Integer> datas = new ArrayList<Integer>();
+        while (buffer.hasRemaining() && buffer.position() < limit) {
+            int col_type = (int) buffer.getPackedLong();
+            datas.add(col_type);
+        }
+
+        int index = 0;
+        for (int i = 0; i < columnCnt; i++) {
+            if (columnInfo[i].type == LogEvent.MYSQL_TYPE_GEOMETRY) {
+                columnInfo[i].geoType = datas.get(index);
+                index++;
+            }
+        }
+    }
+
+    private void parse_simple_pk(LogBuffer buffer, int length) {
+        // stores primary key's column information extracted from
+        // field. Each column has an index and a prefix which are
+        // stored as a unit_pair. prefix is always 0 for
+        // SIMPLE_PRIMARY_KEY field.
+
+        int limit = buffer.position() + length;
+        while (buffer.hasRemaining() && buffer.position() < limit) {
+            int col_index = (int) buffer.getPackedLong();
+            columnInfo[col_index].pk = true;
+        }
+    }
+
+    private void parse_pk_with_prefix(LogBuffer buffer, int length) {
+        // stores primary key's column information extracted from
+        // field. Each column has an index and a prefix which are
+        // stored as a unit_pair.
+        int limit = buffer.position() + length;
+        while (buffer.hasRemaining() && buffer.position() < limit) {
+            int col_index = (int) buffer.getPackedLong();
+            // prefix length, 比如 char(32)
+            @SuppressWarnings("unused")
+            int col_prefix = (int) buffer.getPackedLong();
+            columnInfo[col_index].pk = true;
+        }
+    }
+
+    private boolean is_numeric_type(int type) {
+        switch (type) {
+            case MYSQL_TYPE_TINY:
+            case MYSQL_TYPE_SHORT:
+            case MYSQL_TYPE_INT24:
+            case MYSQL_TYPE_LONG:
+            case MYSQL_TYPE_LONGLONG:
+            case MYSQL_TYPE_NEWDECIMAL:
+            case MYSQL_TYPE_FLOAT:
+            case MYSQL_TYPE_DOUBLE:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    private boolean is_character_type(int type) {
+        switch (type) {
+            case MYSQL_TYPE_STRING:
+            case MYSQL_TYPE_VAR_STRING:
+            case MYSQL_TYPE_VARCHAR:
+            case MYSQL_TYPE_BLOB:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    private int getRealType(int type, int meta) {
+        if (type == LogEvent.MYSQL_TYPE_STRING) {
+            if (meta >= 256) {
+                int byte0 = meta >> 8;
+                if ((byte0 & 0x30) != 0x30) {
+                    /* a long CHAR() field: see #37426 */
+                    type = byte0 | 0x30;
+                } else {
+                    switch (byte0) {
+                        case LogEvent.MYSQL_TYPE_SET:
+                        case LogEvent.MYSQL_TYPE_ENUM:
+                        case LogEvent.MYSQL_TYPE_STRING:
+                            type = byte0;
+                    }
+                }
+            }
+        }
+
+        return type;
+    }
+
     public final String getDbName() {
         return dbname;
     }
@@ -478,4 +803,13 @@ public final class TableMapLogEvent extends LogEvent {
     public final long getTableId() {
         return tableId;
     }
+
+    public boolean isExistOptionalMetaData() {
+        return existOptionalMetaData;
+    }
+
+    public void setExistOptionalMetaData(boolean existOptional) {
+        this.existOptionalMetaData = existOptional;
+    }
+
 }

+ 6 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/UpdateRowsLogEvent.java

@@ -14,6 +14,11 @@ import com.taobao.tddl.dbsync.binlog.LogBuffer;
 public final class UpdateRowsLogEvent extends RowsLogEvent {
 
     public UpdateRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
-        super(header, buffer, descriptionEvent);
+        super(header, buffer, descriptionEvent, false);
+    }
+
+    public UpdateRowsLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent,
+                              boolean partial){
+        super(header, buffer, descriptionEvent, partial);
     }
 }

+ 1 - 0
dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java

@@ -45,6 +45,7 @@ public class DirectLogFetcherTest extends BaseLogFetcherTest {
                         parseRowsEvent((WriteRowsLogEvent) event);
                         break;
                     case LogEvent.UPDATE_ROWS_EVENT_V1:
+                    case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
                     case LogEvent.UPDATE_ROWS_EVENT:
                         parseRowsEvent((UpdateRowsLogEvent) event);
                         break;

+ 1 - 0
dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/FileLogFetcherTest.java

@@ -53,6 +53,7 @@ public class FileLogFetcherTest extends BaseLogFetcherTest {
                             parseRowsEvent((WriteRowsLogEvent) event);
                             break;
                         case LogEvent.UPDATE_ROWS_EVENT_V1:
+                        case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
                         case LogEvent.UPDATE_ROWS_EVENT:
                             parseRowsEvent((UpdateRowsLogEvent) event);
                             break;

+ 258 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/CharsetUtil.java

@@ -0,0 +1,258 @@
+package com.alibaba.otter.canal.parse.driver.mysql.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * mysql collation转换mapping关系表
+ * 
+ * @author agapple 2018年11月5日 下午1:01:15
+ * @since 1.1.2
+ */
+public class CharsetUtil {
+
+    private static final String[]             INDEX_TO_CHARSET = new String[2048];
+    private static final Map<String, Integer> CHARSET_TO_INDEX = new HashMap<String, Integer>();
+    static {
+        INDEX_TO_CHARSET[1] = "big5";
+        INDEX_TO_CHARSET[84] = "big5";
+
+        INDEX_TO_CHARSET[3] = "dec8";
+        INDEX_TO_CHARSET[69] = "dec8";
+
+        INDEX_TO_CHARSET[4] = "cp850";
+        INDEX_TO_CHARSET[80] = "cp850";
+
+        INDEX_TO_CHARSET[6] = "hp8";
+        INDEX_TO_CHARSET[72] = "hp8";
+
+        INDEX_TO_CHARSET[7] = "koi8r";
+        INDEX_TO_CHARSET[74] = "koi8r";
+
+        INDEX_TO_CHARSET[5] = "latin1";
+        INDEX_TO_CHARSET[8] = "latin1";
+        INDEX_TO_CHARSET[15] = "latin1";
+        INDEX_TO_CHARSET[31] = "latin1";
+        INDEX_TO_CHARSET[47] = "latin1";
+        INDEX_TO_CHARSET[48] = "latin1";
+        INDEX_TO_CHARSET[49] = "latin1";
+        INDEX_TO_CHARSET[94] = "latin1";
+
+        INDEX_TO_CHARSET[9] = "latin2";
+        INDEX_TO_CHARSET[21] = "latin2";
+        INDEX_TO_CHARSET[27] = "latin2";
+        INDEX_TO_CHARSET[77] = "latin2";
+
+        INDEX_TO_CHARSET[10] = "swe7";
+        INDEX_TO_CHARSET[82] = "swe7";
+
+        INDEX_TO_CHARSET[11] = "ascii";
+        INDEX_TO_CHARSET[65] = "ascii";
+
+        INDEX_TO_CHARSET[12] = "ujis";
+        INDEX_TO_CHARSET[91] = "ujis";
+
+        INDEX_TO_CHARSET[13] = "sjis";
+        INDEX_TO_CHARSET[88] = "sjis";
+
+        INDEX_TO_CHARSET[16] = "hebrew";
+        INDEX_TO_CHARSET[71] = "hebrew";
+
+        INDEX_TO_CHARSET[18] = "tis620";
+        INDEX_TO_CHARSET[69] = "tis620";
+
+        INDEX_TO_CHARSET[19] = "euckr";
+        INDEX_TO_CHARSET[85] = "euckr";
+
+        INDEX_TO_CHARSET[22] = "koi8u";
+        INDEX_TO_CHARSET[75] = "koi8u";
+
+        INDEX_TO_CHARSET[24] = "gb2312";
+        INDEX_TO_CHARSET[86] = "gb2312";
+
+        INDEX_TO_CHARSET[25] = "greek";
+        INDEX_TO_CHARSET[70] = "greek";
+
+        INDEX_TO_CHARSET[26] = "cp1250";
+        INDEX_TO_CHARSET[34] = "cp1250";
+        INDEX_TO_CHARSET[44] = "cp1250";
+        INDEX_TO_CHARSET[66] = "cp1250";
+        INDEX_TO_CHARSET[99] = "cp1250";
+
+        INDEX_TO_CHARSET[28] = "gbk";
+        INDEX_TO_CHARSET[87] = "gbk";
+
+        INDEX_TO_CHARSET[30] = "latin5";
+        INDEX_TO_CHARSET[78] = "latin5";
+
+        INDEX_TO_CHARSET[32] = "armscii8";
+        INDEX_TO_CHARSET[64] = "armscii8";
+
+        INDEX_TO_CHARSET[33] = "utf8";
+        INDEX_TO_CHARSET[83] = "utf8";
+        for (int i = 192; i <= 223; i++) {
+            INDEX_TO_CHARSET[i] = "utf8";
+        }
+        for (int i = 336; i <= 337; i++) {
+            INDEX_TO_CHARSET[i] = "utf8";
+        }
+        for (int i = 352; i <= 357; i++) {
+            INDEX_TO_CHARSET[i] = "utf8";
+        }
+        INDEX_TO_CHARSET[368] = "utf8";
+        INDEX_TO_CHARSET[2047] = "utf8";
+
+        INDEX_TO_CHARSET[35] = "ucs2";
+        INDEX_TO_CHARSET[90] = "ucs2";
+        for (int i = 128; i <= 151; i++) {
+            INDEX_TO_CHARSET[i] = "ucs2";
+        }
+        INDEX_TO_CHARSET[159] = "ucs2";
+        for (int i = 358; i <= 360; i++) {
+            INDEX_TO_CHARSET[i] = "ucs2";
+        }
+
+        INDEX_TO_CHARSET[36] = "cp866";
+        INDEX_TO_CHARSET[68] = "cp866";
+
+        INDEX_TO_CHARSET[37] = "keybcs2";
+        INDEX_TO_CHARSET[73] = "keybcs2";
+
+        INDEX_TO_CHARSET[38] = "macce";
+        INDEX_TO_CHARSET[43] = "macce";
+
+        INDEX_TO_CHARSET[39] = "macroman";
+        INDEX_TO_CHARSET[53] = "macroman";
+
+        INDEX_TO_CHARSET[40] = "cp852";
+        INDEX_TO_CHARSET[81] = "cp852";
+
+        INDEX_TO_CHARSET[20] = "latin7";
+        INDEX_TO_CHARSET[41] = "latin7";
+        INDEX_TO_CHARSET[42] = "latin7";
+        INDEX_TO_CHARSET[79] = "latin7";
+
+        INDEX_TO_CHARSET[45] = "utf8mb4";
+        INDEX_TO_CHARSET[46] = "utf8mb4";
+        for (int i = 224; i <= 247; i++) {
+            INDEX_TO_CHARSET[i] = "utf8mb4";
+        }
+        for (int i = 255; i <= 271; i++) {
+            INDEX_TO_CHARSET[i] = "utf8mb4";
+        }
+        for (int i = 273; i <= 275; i++) {
+            INDEX_TO_CHARSET[i] = "utf8mb4";
+        }
+        for (int i = 277; i <= 294; i++) {
+            INDEX_TO_CHARSET[i] = "utf8mb4";
+        }
+        for (int i = 296; i <= 298; i++) {
+            INDEX_TO_CHARSET[i] = "utf8mb4";
+        }
+        INDEX_TO_CHARSET[300] = "utf8mb4";
+        for (int i = 303; i <= 307; i++) {
+            INDEX_TO_CHARSET[i] = "utf8mb4";
+        }
+        INDEX_TO_CHARSET[326] = "utf8mb4";
+        INDEX_TO_CHARSET[328] = "utf8mb4";
+
+        INDEX_TO_CHARSET[14] = "cp1251";
+        INDEX_TO_CHARSET[23] = "cp1251";
+        INDEX_TO_CHARSET[50] = "cp1251";
+        INDEX_TO_CHARSET[51] = "cp1251";
+        INDEX_TO_CHARSET[52] = "cp1251";
+
+        INDEX_TO_CHARSET[54] = "utf16";
+        INDEX_TO_CHARSET[55] = "utf16";
+        for (int i = 101; i <= 124; i++) {
+            INDEX_TO_CHARSET[i] = "utf16";
+        }
+        INDEX_TO_CHARSET[327] = "utf16";
+
+        INDEX_TO_CHARSET[56] = "utf16le";
+        INDEX_TO_CHARSET[62] = "utf16le";
+
+        INDEX_TO_CHARSET[57] = "cp1256";
+        INDEX_TO_CHARSET[67] = "cp1256";
+
+        INDEX_TO_CHARSET[29] = "cp1257";
+        INDEX_TO_CHARSET[58] = "cp1257";
+        INDEX_TO_CHARSET[59] = "cp1257";
+
+        INDEX_TO_CHARSET[60] = "utf32";
+        INDEX_TO_CHARSET[61] = "utf32";
+        for (int i = 160; i <= 183; i++) {
+            INDEX_TO_CHARSET[i] = "utf32";
+        }
+        INDEX_TO_CHARSET[391] = "utf32";
+
+        INDEX_TO_CHARSET[63] = "binary";
+
+        INDEX_TO_CHARSET[92] = "geostd8";
+        INDEX_TO_CHARSET[93] = "geostd8";
+
+        INDEX_TO_CHARSET[95] = "cp932";
+        INDEX_TO_CHARSET[96] = "cp932";
+
+        INDEX_TO_CHARSET[97] = "eucjpms";
+        INDEX_TO_CHARSET[98] = "eucjpms";
+
+        for (int i = 248; i <= 250; i++) {
+            INDEX_TO_CHARSET[i] = "gb18030";
+        }
+
+        // charset --> index
+        for (int i = 0; i < 2048; i++) {
+            String charset = INDEX_TO_CHARSET[i];
+            if (charset != null && CHARSET_TO_INDEX.get(charset) == null) {
+                CHARSET_TO_INDEX.put(charset, i);
+            }
+        }
+        CHARSET_TO_INDEX.put("iso-8859-1", 14);
+        CHARSET_TO_INDEX.put("iso_8859_1", 14);
+        CHARSET_TO_INDEX.put("utf-8", 33);
+        CHARSET_TO_INDEX.put("utf8mb4", 45);
+    }
+
+    public static final String getCharset(int index) {
+        return INDEX_TO_CHARSET[index];
+    }
+
+    public static final int getIndex(String charset) {
+        if (charset == null || charset.length() == 0) {
+            return 0;
+        } else {
+            Integer i = CHARSET_TO_INDEX.get(charset.toLowerCase());
+            return (i == null) ? 0 : i.intValue();
+        }
+    }
+
+    /**
+     * 'utf8' COLLATE 'utf8_general_ci'
+     * 
+     * @param charset
+     * @return
+     */
+    public static final String collateCharset(String charset) {
+        String[] output = StringUtils.split(charset, "COLLATE");
+        return output[0].replace('\'', ' ').trim();
+    }
+
+    public static String getJavaCharset(String charset) {
+        if ("utf8".equals(charset)) {
+            return charset;
+        }
+
+        if (StringUtils.endsWithIgnoreCase(charset, "utf8mb4")) {
+            return "utf-8";
+        }
+
+        if (StringUtils.endsWithIgnoreCase(charset, "binary")) {
+            return "iso_8859_1";
+        }
+
+        return charset;
+    }
+}

+ 51 - 0
driver/src/test/java/com/alibaba/otter/canal/parse/driver/mysql/CharsetUtilTest.java

@@ -0,0 +1,51 @@
+package com.alibaba.otter.canal.parse.driver.mysql;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.alibaba.otter.canal.parse.driver.mysql.utils.CharsetUtil;
+
+public class CharsetUtilTest {
+
+    @Test
+    public void testLatin1() {
+        int charsetIndex = 5;
+        String charset = "latin1";
+        Assert.assertTrue(charset.equals(CharsetUtil.getCharset(charsetIndex)));
+    }
+
+    @Test
+    public void testGbk() {
+        int charsetIndex = 87;
+        String charset = "gbk";
+        Assert.assertTrue(charset.equals(CharsetUtil.getCharset(charsetIndex)));
+    }
+
+    @Test
+    public void testGb2312() {
+        int charsetIndex = 24;
+        String charset = "gb2312";
+        Assert.assertTrue(charset.equals(CharsetUtil.getCharset(charsetIndex)));
+    }
+
+    @Test
+    public void testUtf8() {
+        int charsetIndex = 213;
+        String charset = "utf8";
+        Assert.assertTrue(charset.equals(CharsetUtil.getCharset(charsetIndex)));
+    }
+
+    @Test
+    public void testUtf8mb4() {
+        int charsetIndex = 235;
+        String charset = "utf8mb4";
+        Assert.assertTrue(charset.equals(CharsetUtil.getCharset(charsetIndex)));
+    }
+
+    @Test
+    public void testBinary() {
+        int charsetIndex = 63;
+        String charset = "binary";
+        Assert.assertTrue(charset.equals(CharsetUtil.getCharset(charsetIndex)));
+    }
+}

+ 1 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -275,6 +275,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
                         needDmlParse = true;
                         break;
                     case LogEvent.UPDATE_ROWS_EVENT_V1:
+                    case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
                     case LogEvent.UPDATE_ROWS_EVENT:
                         tableMeta = logEventConvert.parseRowsEventForTableMeta((UpdateRowsLogEvent) logEvent);
                         needDmlParse = true;

+ 38 - 18
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -120,6 +120,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             case LogEvent.WRITE_ROWS_EVENT:
                 return parseRowsEvent((WriteRowsLogEvent) logEvent);
             case LogEvent.UPDATE_ROWS_EVENT_V1:
+            case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
             case LogEvent.UPDATE_ROWS_EVENT:
                 return parseRowsEvent((UpdateRowsLogEvent) logEvent);
             case LogEvent.DELETE_ROWS_EVENT_V1:
@@ -261,7 +262,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             }
 
             boolean isDml = (type == EventType.INSERT || type == EventType.UPDATE || type == EventType.DELETE);
-            
+
             if (!isSeek && !isDml) {
                 // 使用新的表结构元数据管理方式
                 EntryPosition position = createPosition(event.getHeader());
@@ -504,7 +505,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             int type = event.getHeader().getType();
             if (LogEvent.WRITE_ROWS_EVENT_V1 == type || LogEvent.WRITE_ROWS_EVENT == type) {
                 eventType = EventType.INSERT;
-            } else if (LogEvent.UPDATE_ROWS_EVENT_V1 == type || LogEvent.UPDATE_ROWS_EVENT == type) {
+            } else if (LogEvent.UPDATE_ROWS_EVENT_V1 == type || LogEvent.UPDATE_ROWS_EVENT == type
+                       || LogEvent.PARTIAL_UPDATE_ROWS_EVENT == type) {
                 eventType = EventType.UPDATE;
             } else if (LogEvent.DELETE_ROWS_EVENT_V1 == type || LogEvent.DELETE_ROWS_EVENT == type) {
                 eventType = EventType.DELETE;
@@ -523,7 +525,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
             boolean tableError = false;
             int rowsCount = 0;
-            while (buffer.nextOneRow(columns)) {
+            while (buffer.nextOneRow(columns, false)) {
                 // 处理row记录
                 RowData.Builder rowDataBuilder = RowData.newBuilder();
                 if (EventType.INSERT == eventType) {
@@ -535,7 +537,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 } else {
                     // update需要处理before/after
                     tableError |= parseOneRow(rowDataBuilder, event, buffer, columns, false, tableMeta);
-                    if (!buffer.nextOneRow(changeColumns)) {
+                    if (!buffer.nextOneRow(changeColumns, true)) {
                         rowChangeBuider.addRowDatas(rowDataBuilder.build());
                         break;
                     }
@@ -577,7 +579,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                                 boolean isAfter, TableMeta tableMeta) throws UnsupportedEncodingException {
         int columnCnt = event.getTable().getColumnCnt();
         ColumnInfo[] columnInfo = event.getTable().getColumnInfo();
-
+        // mysql8.0针对set @@global.binlog_row_metadata='FULL' 可以记录部分的metadata信息
+        boolean existOptionalMetaData = event.getTable().isExistOptionalMetaData();
         boolean tableError = false;
         // check table fileds count,只能处理加字段
         boolean existRDSNoPrimaryKey = false;
@@ -631,22 +634,43 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 continue;
             }
 
+            FieldMeta fieldMeta = null;
+            if (tableMeta != null && !tableError) {
+                // 处理file meta
+                fieldMeta = tableMeta.getFields().get(i);
+            }
+
             if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
                 // 不解析最后一列
-                buffer.nextValue(info.type, info.meta, false);
+                buffer.nextValue(fieldMeta.getColumnName(), i, info.type, info.meta, false);
                 continue;
             }
 
-            Column.Builder columnBuilder = Column.newBuilder();
+            if (fieldMeta != null && existOptionalMetaData) {
+                // check column info
+                boolean check = StringUtils.equalsIgnoreCase(fieldMeta.getColumnName(), info.name);
+                check &= (fieldMeta.isUnsigned() == info.unsigned);
+                check &= (fieldMeta.isNullable() == info.nullable);
+
+                if (!check) {
+                    throw new CanalParseException("MySQL8.0 unmatch column metadata & pls submit issue , db : "
+                                                  + fieldMeta.toString() + " , binlog : " + info.toString()
+                                                  + " , on : " + event.getHeader().getLogFileName() + ":"
+                                                  + event.getHeader().getLogPos());
+                }
+            }
 
-            FieldMeta fieldMeta = null;
-            if (tableMeta != null && !tableError) {
-                // 处理file meta
-                fieldMeta = tableMeta.getFields().get(i);
+            Column.Builder columnBuilder = Column.newBuilder();
+            if (fieldMeta != null) {
                 columnBuilder.setName(fieldMeta.getColumnName());
                 columnBuilder.setIsKey(fieldMeta.isKey());
                 // 增加mysql type类型,issue 73
                 columnBuilder.setMysqlType(fieldMeta.getColumnType());
+            } else if (existOptionalMetaData) {
+                columnBuilder.setName(info.name);
+                columnBuilder.setIsKey(info.pk);
+                // mysql8.0里没有mysql type类型
+                // columnBuilder.setMysqlType(fieldMeta.getColumnType());
             }
             columnBuilder.setIndex(i);
             columnBuilder.setIsNull(false);
@@ -664,12 +688,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                     isSingleBit = true;
                 }
             }
-            buffer.nextValue(info.type, info.meta, isBinary);
-            if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
-                // 不解析最后一列
-                continue;
-            }
 
+            buffer.nextValue(fieldMeta.getColumnName(), i, info.type, info.meta, isBinary);
             int javaType = buffer.getJavaType();
             if (buffer.isNull()) {
                 columnBuilder.setIsNull(true);
@@ -683,7 +703,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                     case Types.BIGINT:
                         // 处理unsigned类型
                         Number number = (Number) value;
-                        if (fieldMeta != null && fieldMeta.isUnsigned() && number.longValue() < 0) {
+                        boolean isUnsigned = (fieldMeta != null ? fieldMeta.isUnsigned() : (existOptionalMetaData ? info.unsigned : false));
+                        if (isUnsigned && number.longValue() < 0) {
                             switch (buffer.getLength()) {
                                 case 1: /* MYSQL_TYPE_TINY */
                                     columnBuilder.setValue(String.valueOf(Integer.valueOf(TINYINT_MAX_VALUE
@@ -772,7 +793,6 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                     default:
                         columnBuilder.setValue(value.toString());
                 }
-
             }
 
             columnBuilder.setSqlType(javaType);

+ 118 - 6
parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java

@@ -1,7 +1,11 @@
 package com.alibaba.otter.canal.parse;
 
 import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
 import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.BitSet;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
@@ -24,12 +28,25 @@ import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
+import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer;
+import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent.ColumnInfo;
+import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
 
 public class DirectLogFetcherTest {
 
     protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+    protected String       binlogFileName = "mysql-bin.000001";
+    protected Charset      charset        = Charset.forName("utf-8");
     private boolean        isMariaDB;
     private int            binlogChecksum;
 
@@ -37,7 +54,7 @@ public class DirectLogFetcherTest {
     public void testSimple() {
         DirectLogFetcher fetcher = new DirectLogFetcher();
         try {
-            MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "xxxx", "xxxx");
+            MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
             connector.connect();
             updateSettings(connector);
             loadBinlogChecksum(connector);
@@ -67,21 +84,22 @@ public class DirectLogFetcherTest {
                         break;
                     case LogEvent.WRITE_ROWS_EVENT_V1:
                     case LogEvent.WRITE_ROWS_EVENT:
-                        // parseRowsEvent((WriteRowsLogEvent) event);
+                        parseRowsEvent((WriteRowsLogEvent) event);
                         break;
                     case LogEvent.UPDATE_ROWS_EVENT_V1:
+                    case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
                     case LogEvent.UPDATE_ROWS_EVENT:
-                        // parseRowsEvent((UpdateRowsLogEvent) event);
+                        parseRowsEvent((UpdateRowsLogEvent) event);
                         break;
                     case LogEvent.DELETE_ROWS_EVENT_V1:
                     case LogEvent.DELETE_ROWS_EVENT:
-                        // parseRowsEvent((DeleteRowsLogEvent) event);
+                        parseRowsEvent((DeleteRowsLogEvent) event);
                         break;
                     case LogEvent.QUERY_EVENT:
-                        // parseQueryEvent((QueryLogEvent) event);
+                        parseQueryEvent((QueryLogEvent) event);
                         break;
                     case LogEvent.ROWS_QUERY_LOG_EVENT:
-                        // parseRowsQueryEvent((RowsQueryLogEvent) event);
+                        parseRowsQueryEvent((RowsQueryLogEvent) event);
                         break;
                     case LogEvent.ANNOTATE_ROWS_EVENT:
                         break;
@@ -241,4 +259,98 @@ public class DirectLogFetcherTest {
         exector.update(cmd);
     }
 
+    protected void parseQueryEvent(QueryLogEvent event) {
+        System.out.println(String.format("================> binlog[%s:%s] , name[%s]",
+            binlogFileName,
+            event.getHeader().getLogPos() - event.getHeader().getEventLen(),
+            event.getCatalog()));
+        System.out.println("sql : " + event.getQuery());
+    }
+
+    protected void parseRowsQueryEvent(RowsQueryLogEvent event) throws Exception {
+        System.out.println(String.format("================> binlog[%s:%s]", binlogFileName, event.getHeader()
+            .getLogPos() - event.getHeader().getEventLen()));
+        System.out.println("sql : " + new String(event.getRowsQuery().getBytes("ISO-8859-1"), charset.name()));
+    }
+
+    protected void parseAnnotateRowsEvent(AnnotateRowsEvent event) throws Exception {
+        System.out.println(String.format("================> binlog[%s:%s]", binlogFileName, event.getHeader()
+            .getLogPos() - event.getHeader().getEventLen()));
+        System.out.println("sql : " + new String(event.getRowsQuery().getBytes("ISO-8859-1"), charset.name()));
+    }
+
+    protected void parseXidEvent(XidLogEvent event) throws Exception {
+        System.out.println(String.format("================> binlog[%s:%s]", binlogFileName, event.getHeader()
+            .getLogPos() - event.getHeader().getEventLen()));
+        System.out.println("xid : " + event.getXid());
+    }
+
+    protected void parseRowsEvent(RowsLogEvent event) {
+        try {
+            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s]",
+                binlogFileName,
+                event.getHeader().getLogPos() - event.getHeader().getEventLen(),
+                event.getTable().getDbName(),
+                event.getTable().getTableName()));
+            RowsLogBuffer buffer = event.getRowsBuf(charset.name());
+            BitSet columns = event.getColumns();
+            BitSet changeColumns = event.getChangeColumns();
+            while (buffer.nextOneRow(columns)) {
+                // 处理row记录
+                int type = event.getHeader().getType();
+                if (LogEvent.WRITE_ROWS_EVENT_V1 == type || LogEvent.WRITE_ROWS_EVENT == type) {
+                    // insert的记录放在before字段中
+                    parseOneRow(event, buffer, columns, true);
+                } else if (LogEvent.DELETE_ROWS_EVENT_V1 == type || LogEvent.DELETE_ROWS_EVENT == type) {
+                    // delete的记录放在before字段中
+                    parseOneRow(event, buffer, columns, false);
+                } else {
+                    // update需要处理before/after
+                    System.out.println("-------> before");
+                    parseOneRow(event, buffer, columns, false);
+                    if (!buffer.nextOneRow(changeColumns, true)) {
+                        break;
+                    }
+                    System.out.println("-------> after");
+                    parseOneRow(event, buffer, changeColumns, true);
+                }
+
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("parse row data failed.", e);
+        }
+    }
+
+    protected void parseOneRow(RowsLogEvent event, RowsLogBuffer buffer, BitSet cols, boolean isAfter)
+                                                                                                      throws UnsupportedEncodingException {
+        TableMapLogEvent map = event.getTable();
+        if (map == null) {
+            throw new RuntimeException("not found TableMap with tid=" + event.getTableId());
+        }
+
+        final int columnCnt = map.getColumnCnt();
+        final ColumnInfo[] columnInfo = map.getColumnInfo();
+
+        for (int i = 0; i < columnCnt; i++) {
+            if (!cols.get(i)) {
+                continue;
+            }
+
+            ColumnInfo info = columnInfo[i];
+            buffer.nextValue(null, i, info.type, info.meta);
+
+            if (buffer.isNull()) {
+                //
+            } else {
+                final Serializable value = buffer.getValue();
+                if (value instanceof byte[]) {
+                    System.out.println(new String((byte[]) value));
+                } else {
+                    System.out.println(value);
+                }
+            }
+        }
+
+    }
+
 }

+ 3 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogParsePerformanceTest.java

@@ -89,6 +89,7 @@ public class MysqlBinlogParsePerformanceTest {
                     parseRowsEvent((WriteRowsLogEvent) event, sum);
                     break;
                 case LogEvent.UPDATE_ROWS_EVENT_V1:
+                case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
                 case LogEvent.UPDATE_ROWS_EVENT:
                     parseRowsEvent((UpdateRowsLogEvent) event, sum);
                     break;
@@ -154,7 +155,7 @@ public class MysqlBinlogParsePerformanceTest {
                     parseOneRow(event, buffer, columns, false);
                 } else {
                     parseOneRow(event, buffer, columns, false);
-                    if (!buffer.nextOneRow(changeColumns)) {
+                    if (!buffer.nextOneRow(changeColumns, true)) {
                         break;
                     }
                     parseOneRow(event, buffer, changeColumns, true);
@@ -182,7 +183,7 @@ public class MysqlBinlogParsePerformanceTest {
             }
 
             ColumnInfo info = columnInfo[i];
-            buffer.nextValue(info.type, info.meta);
+            buffer.nextValue(null, i, info.type, info.meta);
             if (buffer.isNull()) {
             } else {
                 buffer.getValue();