Bläddra i källkod

fixed issue #5017 , json partial update fixed & format

jianghang.loujh 1 år sedan
förälder
incheckning
04c11ec8eb

+ 6 - 4
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonDiffConversion.java

@@ -73,12 +73,13 @@ public class JsonDiffConversion {
                 buffer.forward((int) value_length);
             }
 
-            if (buffer.position - position >= len) {
+            // see https://github.com/alibaba/canal/pull/5018
+            if (buffer.position() - position >= len) {
                 break;
             }
         }
 
-        if (buffer.position - position != len) {
+        if (buffer.position() - position != len) {
             throw new IllegalArgumentException("reading json diff");
         }
 
@@ -137,7 +138,8 @@ public class JsonDiffConversion {
                 builder.append(jsonBuilder);
             }
 
-            if (buffer.position - position >= len) {
+            // see https://github.com/alibaba/canal/pull/5018
+            if (buffer.position() - position >= len) {
                 builder.append(")");
                 break;
             }
@@ -153,7 +155,7 @@ public class JsonDiffConversion {
             diff_i++;
         }
 
-        if (buffer.position - position != len) {
+        if (buffer.position() - position != len) {
             throw new IllegalArgumentException("reading json diff");
         }
 

+ 66 - 95
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java

@@ -4,8 +4,6 @@ import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.sql.Timestamp;
 import java.sql.Types;
-import java.time.ZoneId;
-import java.time.zone.ZoneRules;
 import java.util.BitSet;
 
 import org.apache.commons.logging.Log;
@@ -32,27 +30,27 @@ public final class RowsLogBuffer {
     public static final Integer[] integerCache      = new Integer[1024 * 128];
     public static final int       integerCacheLimit = longCache.length + 127;
 
-    public static final long   DATETIMEF_INT_OFS = 0x8000000000L;
-    public static final long   TIMEF_INT_OFS     = 0x800000L;
-    public static final long   TIMEF_OFS         = 0x800000000000L;
-    private static char[]      digits            = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' };
+    public static final long      DATETIMEF_INT_OFS = 0x8000000000L;
+    public static final long      TIMEF_INT_OFS     = 0x800000L;
+    public static final long      TIMEF_OFS         = 0x800000000000L;
+    private static char[]         digits            = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' };
 
-    private final LogBuffer    buffer;
-    private final int          columnLen;
-    private final int          jsonColumnCount;
-    private final Charset      charset;
+    private final LogBuffer       buffer;
+    private final int             columnLen;
+    private final int             jsonColumnCount;
+    private final Charset         charset;
 
-    private final BitSet       nullBits;
-    private int                nullBitIndex;
+    private final BitSet          nullBits;
+    private int                   nullBitIndex;
 
     // Read value_options if this is AI for PARTIAL_UPDATE_ROWS_EVENT
-    private final boolean      partial;
-    private final BitSet       partialBits;
+    private final boolean         partial;
+    private final BitSet          partialBits;
 
-    private boolean            fNull;
-    private int                javaType;
-    private int                length;
-    private Serializable       value;
+    private boolean               fNull;
+    private int                   javaType;
+    private int                   length;
+    private Serializable          value;
 
     public RowsLogBuffer(LogBuffer buffer, final int columnLen, Charset charset, int jsonColumnCount, boolean partial){
         this.buffer = buffer;
@@ -71,8 +69,7 @@ public final class RowsLogBuffer {
     /**
      * Extracting next row from packed buffer.
      *
-     * @see mysql-5.1.60/sql/log_event.cc -
-     * Rows_log_event::print_verbose_one_row
+     * @see mysql-5.1.60/sql/log_event.cc - Rows_log_event::print_verbose_one_row
      */
     public final boolean nextOneRow(BitSet columns, boolean after) {
         final boolean hasOneRow = buffer.hasRemaining();
@@ -105,8 +102,7 @@ public final class RowsLogBuffer {
     /**
      * Extracting next field value from packed buffer.
      *
-     * @see mysql-5.1.60/sql/log_event.cc -
-     * Rows_log_event::print_verbose_one_row
+     * @see mysql-5.1.60/sql/log_event.cc - Rows_log_event::print_verbose_one_row
      */
     public final Serializable nextValue(final String columName, final int columnIndex, final int type, final int meta) {
         return nextValue(columName, columnIndex, type, meta, false);
@@ -115,8 +111,7 @@ public final class RowsLogBuffer {
     /**
      * Extracting next field value from packed buffer.
      *
-     * @see mysql-5.1.60/sql/log_event.cc -
-     * Rows_log_event::print_verbose_one_row
+     * @see mysql-5.1.60/sql/log_event.cc - Rows_log_event::print_verbose_one_row
      */
     public final Serializable nextValue(final String columName, final int columnIndex, final int type, final int meta,
                                         boolean isBinary) {
@@ -300,10 +295,8 @@ public final class RowsLogBuffer {
                             len = byte1;
                             break;
                         default:
-                            throw new IllegalArgumentException(String.format("!! Don't know how to handle column type=%d meta=%d (%04X)",
-                                type,
-                                meta,
-                                meta));
+                            throw new IllegalArgumentException(String
+                                .format("!! Don't know how to handle column type=%d meta=%d (%04X)", type, meta, meta));
                     }
                 }
             } else {
@@ -359,8 +352,8 @@ public final class RowsLogBuffer {
             }
             case LogEvent.MYSQL_TYPE_DECIMAL: {
                 /*
-                 * log_event.h : This enumeration value is only used internally
-                 * and cannot exist in a binlog.
+                 * log_event.h : This enumeration value is only used internally and cannot exist
+                 * in a binlog.
                  */
                 logger.warn("MYSQL_TYPE_DECIMAL : This enumeration value is "
                             + "only used internally and cannot exist in a binlog!");
@@ -541,12 +534,10 @@ public final class RowsLogBuffer {
             }
             case LogEvent.MYSQL_TYPE_DATETIME2: {
                 /*
-                 * DATETIME and DATE low-level memory and disk representation
-                 * routines 1 bit sign (used when on disk) 17 bits year*13+month
-                 * (year 0-9999, month 0-12) 5 bits day (0-31) 5 bits hour
-                 * (0-23) 6 bits minute (0-59) 6 bits second (0-59) 24 bits
-                 * microseconds (0-999999) Total: 64 bits = 8 bytes
-                 * SYYYYYYY.YYYYYYYY
+                 * DATETIME and DATE low-level memory and disk representation routines 1 bit
+                 * sign (used when on disk) 17 bits year*13+month (year 0-9999, month 0-12) 5
+                 * bits day (0-31) 5 bits hour (0-23) 6 bits minute (0-59) 6 bits second (0-59)
+                 * 24 bits microseconds (0-999999) Total: 64 bits = 8 bytes SYYYYYYY.YYYYYYYY
                  * .YYdddddh.hhhhmmmm.mmssssss.ffffffff.ffffffff.ffffffff
                  */
                 long intpart = buffer.getBeUlong40() - DATETIMEF_INT_OFS; // big-endian
@@ -665,12 +656,10 @@ public final class RowsLogBuffer {
             }
             case LogEvent.MYSQL_TYPE_TIME2: {
                 /*
-                 * TIME low-level memory and disk representation routines
-                 * In-memory format: 1 bit sign (Used for sign, when on disk) 1
-                 * bit unused (Reserved for wider hour range, e.g. for
-                 * intervals) 10 bit hour (0-836) 6 bit minute (0-59) 6 bit
-                 * second (0-59) 24 bits microseconds (0-999999) Total: 48 bits
-                 * = 6 bytes
+                 * TIME low-level memory and disk representation routines In-memory format: 1
+                 * bit sign (Used for sign, when on disk) 1 bit unused (Reserved for wider hour
+                 * range, e.g. for intervals) 10 bit hour (0-836) 6 bit minute (0-59) 6 bit
+                 * second (0-59) 24 bits microseconds (0-999999) Total: 48 bits = 6 bytes
                  * Suhhhhhh.hhhhmmmm.mmssssss.ffffffff.ffffffff.ffffffff
                  */
                 long intpart = 0;
@@ -687,20 +676,15 @@ public final class RowsLogBuffer {
                         frac = buffer.getUint8();
                         if (intpart < 0 && frac > 0) {
                             /*
-                             * Negative values are stored with reverse
-                             * fractional part order, for binary sort
-                             * compatibility. Disk value intpart frac Time value
-                             * Memory value 800000.00 0 0 00:00:00.00
-                             * 0000000000.000000 7FFFFF.FF -1 255 -00:00:00.01
-                             * FFFFFFFFFF.FFD8F0 7FFFFF.9D -1 99 -00:00:00.99
-                             * FFFFFFFFFF.F0E4D0 7FFFFF.00 -1 0 -00:00:01.00
-                             * FFFFFFFFFF.000000 7FFFFE.FF -1 255 -00:00:01.01
-                             * FFFFFFFFFE.FFD8F0 7FFFFE.F6 -2 246 -00:00:01.10
-                             * FFFFFFFFFE.FE7960 Formula to convert fractional
-                             * part from disk format (now stored in "frac"
-                             * variable) to absolute value: "0x100 - frac". To
-                             * reconstruct in-memory value, we shift to the next
-                             * integer value and then substruct fractional part.
+                             * Negative values are stored with reverse fractional part order, for binary
+                             * sort compatibility. Disk value intpart frac Time value Memory value 800000.00
+                             * 0 0 00:00:00.00 0000000000.000000 7FFFFF.FF -1 255 -00:00:00.01
+                             * FFFFFFFFFF.FFD8F0 7FFFFF.9D -1 99 -00:00:00.99 FFFFFFFFFF.F0E4D0 7FFFFF.00 -1
+                             * 0 -00:00:01.00 FFFFFFFFFF.000000 7FFFFE.FF -1 255 -00:00:01.01
+                             * FFFFFFFFFE.FFD8F0 7FFFFE.F6 -2 246 -00:00:01.10 FFFFFFFFFE.FE7960 Formula to
+                             * convert fractional part from disk format (now stored in "frac" variable) to
+                             * absolute value: "0x100 - frac". To reconstruct in-memory value, we shift to
+                             * the next integer value and then substruct fractional part.
                              */
                             intpart++; /* Shift to the next integer value */
                             frac -= 0x100; /* -(0x100 - frac) */
@@ -715,9 +699,8 @@ public final class RowsLogBuffer {
                         frac = buffer.getBeUint16();
                         if (intpart < 0 && frac > 0) {
                             /*
-                             * Fix reverse fractional part order:
-                             * "0x10000 - frac". See comments for FSP=1 and
-                             * FSP=2 above.
+                             * Fix reverse fractional part order: "0x10000 - frac". See comments for FSP=1
+                             * and FSP=2 above.
                              */
                             intpart++; /* Shift to the next integer value */
                             frac -= 0x10000; /* -(0x10000-frac) */
@@ -789,8 +772,8 @@ public final class RowsLogBuffer {
             }
             case LogEvent.MYSQL_TYPE_NEWDATE: {
                 /*
-                 * log_event.h : This enumeration value is only used internally
-                 * and cannot exist in a binlog.
+                 * log_event.h : This enumeration value is only used internally and cannot exist
+                 * in a binlog.
                  */
                 logger.warn("MYSQL_TYPE_NEWDATE : This enumeration value is "
                             + "only used internally and cannot exist in a binlog!");
@@ -862,8 +845,8 @@ public final class RowsLogBuffer {
             case LogEvent.MYSQL_TYPE_ENUM: {
                 final int int32;
                 /*
-                 * log_event.h : This enumeration value is only used internally
-                 * and cannot exist in a binlog.
+                 * log_event.h : This enumeration value is only used internally and cannot exist
+                 * in a binlog.
                  */
                 switch (len) {
                     case 1:
@@ -929,24 +912,24 @@ public final class RowsLogBuffer {
             }
             case LogEvent.MYSQL_TYPE_TINY_BLOB: {
                 /*
-                 * log_event.h : This enumeration value is only used internally
-                 * and cannot exist in a binlog.
+                 * log_event.h : This enumeration value is only used internally and cannot exist
+                 * in a binlog.
                  */
                 logger.warn("MYSQL_TYPE_TINY_BLOB : This enumeration value is "
                             + "only used internally and cannot exist in a binlog!");
             }
             case LogEvent.MYSQL_TYPE_MEDIUM_BLOB: {
                 /*
-                 * log_event.h : This enumeration value is only used internally
-                 * and cannot exist in a binlog.
+                 * log_event.h : This enumeration value is only used internally and cannot exist
+                 * in a binlog.
                  */
                 logger.warn("MYSQL_TYPE_MEDIUM_BLOB : This enumeration value is "
                             + "only used internally and cannot exist in a binlog!");
             }
             case LogEvent.MYSQL_TYPE_LONG_BLOB: {
                 /*
-                 * log_event.h : This enumeration value is only used internally
-                 * and cannot exist in a binlog.
+                 * log_event.h : This enumeration value is only used internally and cannot exist
+                 * in a binlog.
                  */
                 logger.warn("MYSQL_TYPE_LONG_BLOB : This enumeration value is "
                             + "only used internally and cannot exist in a binlog!");
@@ -1005,8 +988,7 @@ public final class RowsLogBuffer {
             case LogEvent.MYSQL_TYPE_VAR_STRING: {
                 /*
                  * Except for the data length calculation, MYSQL_TYPE_VARCHAR,
-                 * MYSQL_TYPE_VAR_STRING and MYSQL_TYPE_STRING are handled the
-                 * same way.
+                 * MYSQL_TYPE_VAR_STRING and MYSQL_TYPE_STRING are handled the same way.
                  */
                 len = meta;
                 if (len < 256) {
@@ -1078,11 +1060,9 @@ public final class RowsLogBuffer {
                     // print_json_diff
                     int position = buffer.position();
                     try {
-                        StringBuilder builder = JsonDiffConversion.print_json_diff(buffer,
-                                len,
-                                columnName,
-                                columnIndex,
-                                charset);
+                        // https://github.com/alibaba/canal/pull/5018
+                        StringBuilder builder = JsonDiffConversion
+                            .print_json_diff(buffer, len, columnName, columnIndex, charset);
                         value = builder.toString();
                         buffer.position(position + len);
                     } catch (IllegalArgumentException e) {
@@ -1120,24 +1100,19 @@ public final class RowsLogBuffer {
                 /* fill binary */
                 byte[] binary = new byte[len];
                 buffer.fillBytes(binary, 0, len);
-
-                /* Warning unsupport cloumn type */
-                // logger.warn(String.format("!! Unsupport column type MYSQL_TYPE_GEOMETRY: meta=%d (%04X), len = %d",
-                // meta,
-                // meta,
-                // len));
+                // Warning unsupport cloumn type
+                // logger.warn(String.format("!! Unsupport column type MYSQL_TYPE_GEOMETRY:
+                // meta=%d (%04X), len = %d", meta,meta, len));
                 javaType = Types.BINARY;
                 value = binary;
                 length = len;
                 break;
             }
-            case LogEvent.MYSQL_TYPE_BOOL :
-            case LogEvent.MYSQL_TYPE_INVALID :
+            case LogEvent.MYSQL_TYPE_BOOL:
+            case LogEvent.MYSQL_TYPE_INVALID:
             default:
-                logger.error(String.format("!! Don't know how to handle column type=%d meta=%d (%04X)",
-                    type,
-                    meta,
-                    meta));
+                logger.error(
+                    String.format("!! Don't know how to handle column type=%d meta=%d (%04X)", type, meta, meta));
                 javaType = Types.OTHER;
                 value = null;
                 length = 0;
@@ -1148,16 +1123,12 @@ public final class RowsLogBuffer {
 
     private void parseJsonFromFullValue(int len) {
         if (0 == len) {
-            // fixed issue #1 by lava, json column of zero length
-            // has no
-            // value, value parsing should be skipped
+            // fixed issue #1 by lava, json column of zero length has no value, value
+            // parsing should be skipped
             value = "";
         } else {
             int position = buffer.position();
-            Json_Value jsonValue = JsonConversion.parse_value(buffer.getUint8(),
-                buffer,
-                len - 1,
-                    charset);
+            Json_Value jsonValue = JsonConversion.parse_value(buffer.getUint8(), buffer, len - 1, charset);
             StringBuilder builder = new StringBuilder();
             jsonValue.toJsonString(builder, charset);
             value = builder.toString();