Переглянути джерело

performance optimize , 1. cache string names 2. cache Charset

jianghang.loujh 2 роки тому
батько
коміт
f7a4979b15
21 змінених файлів з 225 додано та 141 видалено
  1. 19 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/CharsetConversion.java
  2. 24 18
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonConversion.java
  3. 8 2
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonDiffConversion.java
  4. 114 73
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogBuffer.java
  5. 0 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/NameCache.java
  6. 10 7
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/GtidLogEvent.java
  7. 3 1
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/HeartbeatLogEvent.java
  8. 1 1
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/IncidentLogEvent.java
  9. 12 12
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/QueryLogEvent.java
  10. 9 8
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java
  11. 3 2
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java
  12. 3 1
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsQueryLogEvent.java
  13. 1 1
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/StartLogEventV3.java
  14. 2 2
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/TableMapLogEvent.java
  15. 3 2
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/UserVarLogEvent.java
  16. 3 1
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/AnnotateRowsEvent.java
  17. 1 1
      dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/BaseLogFetcherTest.java
  18. 1 1
      meta/src/main/java/com/alibaba/otter/canal/meta/FileMixedMetaManager.java
  19. 6 6
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  20. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java
  21. 1 1
      parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogParsePerformanceTest.java

+ 19 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/CharsetConversion.java

@@ -21,6 +21,7 @@ public final class CharsetConversion {
         protected final String mysqlCharset;
         protected final String mysqlCollation;
         protected final String javaCharset;
+        protected final Charset charset;
 
         Entry(final int id, String mysqlCharset, // NL
               String mysqlCollation, String javaCharset){
@@ -28,6 +29,7 @@ public final class CharsetConversion {
             this.mysqlCharset = mysqlCharset;
             this.mysqlCollation = mysqlCollation;
             this.javaCharset = javaCharset;
+            this.charset = Charset.isSupported(javaCharset) ? Charset.forName(javaCharset) : null;
         }
     }
 
@@ -390,6 +392,23 @@ public final class CharsetConversion {
         }
     }
 
+    public static Charset getNioCharset(final int id) {
+        Entry entry = getEntry(id);
+
+        if (entry != null) {
+            if (entry.charset != null) {
+                return entry.charset;
+            } else {
+                logger.warn("Unknown java charset for: id = " + id + ", name = " + entry.mysqlCharset + ", coll = "
+                            + entry.mysqlCollation);
+                return null;
+            }
+        } else {
+            logger.warn("Unexpect mysql charset: " + id);
+            return null;
+        }
+    }
+
     public static void main(String[] args) {
         for (int i = 0; i < entries.length; i++) {
             Entry entry = entries[i];

+ 24 - 18
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonConversion.java

@@ -1,5 +1,7 @@
 package com.taobao.tddl.dbsync.binlog;
 
+import java.nio.charset.Charset;
+
 import static com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer.appendNumber2;
 import static com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer.appendNumber4;
 import static com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer.usecondsToStr;
@@ -57,23 +59,27 @@ public class JsonConversion {
     public static final int  VALUE_ENTRY_SIZE_LARGE  = (1 + LARGE_OFFSET_SIZE);
 
     public static Json_Value parse_value(int type, LogBuffer buffer, long len, String charsetName) {
+        return parse_value(type, buffer, len, Charset.forName(charsetName));
+    }
+
+    public static Json_Value parse_value(int type, LogBuffer buffer, long len, Charset charset) {
         buffer = buffer.duplicate(buffer.position(), (int) len);
         switch (type) {
             case JSONB_TYPE_SMALL_OBJECT:
-                return parse_array_or_object(Json_enum_type.OBJECT, buffer, len, false, charsetName);
+                return parse_array_or_object(Json_enum_type.OBJECT, buffer, len, false, charset);
             case JSONB_TYPE_LARGE_OBJECT:
-                return parse_array_or_object(Json_enum_type.OBJECT, buffer, len, true, charsetName);
+                return parse_array_or_object(Json_enum_type.OBJECT, buffer, len, true, charset);
             case JSONB_TYPE_SMALL_ARRAY:
-                return parse_array_or_object(Json_enum_type.ARRAY, buffer, len, false, charsetName);
+                return parse_array_or_object(Json_enum_type.ARRAY, buffer, len, false, charset);
             case JSONB_TYPE_LARGE_ARRAY:
-                return parse_array_or_object(Json_enum_type.ARRAY, buffer, len, true, charsetName);
+                return parse_array_or_object(Json_enum_type.ARRAY, buffer, len, true, charset);
             default:
-                return parse_scalar(type, buffer, len, charsetName);
+                return parse_scalar(type, buffer, len, charset);
         }
     }
 
     private static Json_Value parse_array_or_object(Json_enum_type type, LogBuffer buffer, long len, boolean large,
-                                                    String charsetName) {
+                                                    Charset charset) {
         long offset_size = large ? LARGE_OFFSET_SIZE : SMALL_OFFSET_SIZE;
         if (len < 2 * offset_size) {
             throw new IllegalArgumentException("illegal json data");
@@ -100,7 +106,7 @@ public class JsonConversion {
         return large ? buffer.getUint32() : buffer.getUint16();
     }
 
-    private static Json_Value parse_scalar(int type, LogBuffer buffer, long len, String charsetName) {
+    private static Json_Value parse_scalar(int type, LogBuffer buffer, long len, Charset charset) {
         switch (type) {
             case JSONB_TYPE_LITERAL:
                 /* purecov: inspected */
@@ -156,7 +162,7 @@ public class JsonConversion {
                 }
                 return new Json_Value(Json_enum_type.STRING, buffer.rewind()
                     .forward((int) n)
-                    .getFixString((int) str_len, charsetName));
+                    .getFixString((int) str_len, charset));
             case JSONB_TYPE_OPAQUE:
                 /*
                  * There should always be at least one byte, which tells the
@@ -242,7 +248,7 @@ public class JsonConversion {
             this.m_large = large;
         }
 
-        public String key(int i, String charsetName) {
+        public String key(int i, Charset charset) {
             m_data.rewind();
             int offset_size = m_large ? LARGE_OFFSET_SIZE : SMALL_OFFSET_SIZE;
             int key_entry_size = m_large ? KEY_ENTRY_SIZE_LARGE : KEY_ENTRY_SIZE_SMALL;
@@ -255,10 +261,10 @@ public class JsonConversion {
             // entry, always two
             // bytes.
             long key_length = m_data.getUint16();
-            return m_data.rewind().forward((int) key_offset).getFixString((int) key_length, charsetName);
+            return m_data.rewind().forward((int) key_offset).getFixString((int) key_length, charset);
         }
 
-        public Json_Value element(int i, String charsetName) {
+        public Json_Value element(int i, Charset charset) {
             m_data.rewind();
             int offset_size = m_large ? LARGE_OFFSET_SIZE : SMALL_OFFSET_SIZE;
             int key_entry_size = m_large ? KEY_ENTRY_SIZE_LARGE : KEY_ENTRY_SIZE_SMALL;
@@ -271,13 +277,13 @@ public class JsonConversion {
             int type = m_data.forward(entry_offset).getUint8();
             if (type == JSONB_TYPE_INT16 || type == JSONB_TYPE_UINT16 || type == JSONB_TYPE_LITERAL
                 || (m_large && (type == JSONB_TYPE_INT32 || type == JSONB_TYPE_UINT32))) {
-                return parse_scalar(type, m_data, value_entry_size - 1, charsetName);
+                return parse_scalar(type, m_data, value_entry_size - 1, charset);
             }
             int value_offset = (int) read_offset_or_size(m_data, m_large);
-            return parse_value(type, m_data.rewind().forward(value_offset), (int) m_length - value_offset, charsetName);
+            return parse_value(type, m_data.rewind().forward(value_offset), (int) m_length - value_offset, charset);
         }
 
-        public StringBuilder toJsonString(StringBuilder buf, String charsetName) {
+        public StringBuilder toJsonString(StringBuilder buf, Charset charset) {
             switch (m_type) {
                 case OBJECT:
                     buf.append("{");
@@ -285,9 +291,9 @@ public class JsonConversion {
                         if (i > 0) {
                             buf.append(", ");
                         }
-                        buf.append('"').append(key(i, charsetName)).append('"');
+                        buf.append('"').append(key(i, charset)).append('"');
                         buf.append(": ");
-                        element(i, charsetName).toJsonString(buf, charsetName);
+                        element(i, charset).toJsonString(buf, charset);
                     }
                     buf.append("}");
                     break;
@@ -297,7 +303,7 @@ public class JsonConversion {
                         if (i > 0) {
                             buf.append(", ");
                         }
-                        element(i, charsetName).toJsonString(buf, charsetName);
+                        element(i, charset).toJsonString(buf, charset);
                     }
                     buf.append("]");
                     break;
@@ -398,7 +404,7 @@ public class JsonConversion {
                         }
                         buf.append('"').append(text).append('"');
                     } else {
-                        text = m_data.getFixString((int) m_length, charsetName);
+                        text = m_data.getFixString((int) m_length, charset);
                         buf.append('"').append(escapse(text)).append('"');
                     }
 

+ 8 - 2
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/JsonDiffConversion.java

@@ -1,5 +1,6 @@
 package com.taobao.tddl.dbsync.binlog;
 
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -36,6 +37,11 @@ public class JsonDiffConversion {
 
     public static StringBuilder print_json_diff(LogBuffer buffer, long len, String columnName, int columnIndex,
                                                 String charsetName) {
+        return print_json_diff(buffer, len, columnName, columnIndex, Charset.forName(charsetName));
+    }
+
+    public static StringBuilder print_json_diff(LogBuffer buffer, long len, String columnName, int columnIndex,
+                                                Charset charset) {
         int position = buffer.position();
         List<String> operation_names = new ArrayList<>();
         while (buffer.hasRemaining()) {
@@ -111,14 +117,14 @@ public class JsonDiffConversion {
                 Json_Value jsonValue = JsonConversion.parse_value(buffer.getUint8(),
                     buffer,
                     value_length - 1,
-                    charsetName);
+                    charset);
                 buffer.forward((int) value_length - 1);
                 // Read value
                 if (jsonValue.m_type == Json_enum_type.ERROR) {
                     throw new IllegalArgumentException("parsing json value");
                 }
                 StringBuilder jsonBuilder = new StringBuilder();
-                jsonValue.toJsonString(jsonBuilder, charsetName);
+                jsonValue.toJsonString(jsonBuilder, charset);
                 builder.append(jsonBuilder);
             }
 

+ 114 - 73
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogBuffer.java

@@ -5,6 +5,8 @@ import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.BitSet;
 
@@ -1040,29 +1042,27 @@ public class LogBuffer {
         }
     }
 
-    /* default ANSI charset */
-    public static final String ISO_8859_1 = "ISO-8859-1";
-
     /**
      * Return fix length string from buffer.
      */
     public final String getFixString(final int pos, final int len) {
-        return getFixString(pos, len, ISO_8859_1);
+        return getFixString(pos, len, StandardCharsets.ISO_8859_1);
     }
 
     /**
      * Return next fix length string from buffer.
      */
     public final String getFixString(final int len) {
-        return getFixString(len, ISO_8859_1);
+        return getFixString(len, StandardCharsets.ISO_8859_1);
     }
 
     /**
      * Return fix length string from buffer.
      */
-    public final String getFixString(final int pos, final int len, String charsetName) {
-        if (pos + len > limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                             + (pos < 0 ? pos : (pos + len)));
+    public final String getFixString(final int pos, final int len, Charset charset) {
+        if (pos + len > limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + len)));
+        }
 
         final int from = origin + pos;
         final int end = from + len;
@@ -1071,19 +1071,16 @@ public class LogBuffer {
         for (; (found < end) && buf[found] != '\0'; found++)
             /* empty loop */;
 
-        try {
-            return new String(buf, from, found - from, charsetName);
-        } catch (UnsupportedEncodingException e) {
-            throw new IllegalArgumentException("Unsupported encoding: " + charsetName, e);
-        }
+        return new String(buf, from, found - from, charset);
     }
 
     /**
      * Return next fix length string from buffer.
      */
-    public final String getFixString(final int len, String charsetName) {
-        if (position + len > origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                                + (position + len - origin));
+    public final String getFixString(final int len, Charset charset) {
+        if (position + len > origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position + len - origin));
+        }
 
         final int from = position;
         final int end = from + len;
@@ -1092,97 +1089,141 @@ public class LogBuffer {
         for (; (found < end) && buf[found] != '\0'; found++)
             /* empty loop */;
 
-        try {
-            String string = new String(buf, from, found - from, charsetName);
-            position += len;
-            return string;
-        } catch (UnsupportedEncodingException e) {
-            throw new IllegalArgumentException("Unsupported encoding: " + charsetName, e);
-        }
+        String string = new String(buf, from, found - from, charset);
+        position += len;
+        return string;
     }
 
-    /**
-     * Return fix-length string from buffer without null-terminate checking. Fix
-     * bug #17 {@link https://github.com/AlibabaTech/canal/issues/17 }
-     */
-    public final String getFullString(final int pos, final int len, String charsetName) {
-        if (pos + len > limit || pos < 0) throw new IllegalArgumentException("limit excceed: "
-                                                                             + (pos < 0 ? pos : (pos + len)));
+    public final String getFixName(final int len, Charset charset) {
+        if (position + len > origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position + len - origin));
+        }
+
+        final int from = position;
+        final int end = from + len;
+        byte[] buf = buffer;
+        int found = from;
+        for (; (found < end) && buf[found] != '\0'; found++)
+            /* empty loop */;
 
-        try {
-            return new String(buffer, origin + pos, len, charsetName);
-        } catch (UnsupportedEncodingException e) {
-            throw new IllegalArgumentException("Unsupported encoding: " + charsetName, e);
+        int length = found - from;
+        String string = null;
+        if (length <= 16) {
+            string = NameCache.name(buf, from, length, charset);
         }
+        if (string == null) {
+            string = new String(buf, from, length, charset);
+        }
+        position += len;
+        return string;
     }
 
-    /**
-     * Return next fix-length string from buffer without null-terminate
-     * checking. Fix bug #17 {@link https
-     * ://github.com/AlibabaTech/canal/issues/17 }
-     */
-    public final String getFullString(final int len, String charsetName) {
-        if (position + len > origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                                + (position + len - origin));
+    public final String getFixName(final int len) {
+        if (position + len > origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position + len - origin));
+        }
 
-        try {
-            String string = new String(buffer, position, len, charsetName);
-            position += len;
-            return string;
-        } catch (UnsupportedEncodingException e) {
-            throw new IllegalArgumentException("Unsupported encoding: " + charsetName, e);
+        final int from = position;
+        final int end = from + len;
+        byte[] buf = buffer;
+        int found = from;
+        for (; (found < end) && buf[found] != '\0'; found++)
+            /* empty loop */;
+
+        int length = found - from;
+        String string = null;
+        if (length <= 16) {
+            string = NameCache.name(buf, from, length, StandardCharsets.ISO_8859_1);
+        }
+        if (string == null) {
+            string = new String(buf, from, length, StandardCharsets.ISO_8859_1);
         }
+        position += len;
+        return string;
     }
 
     /**
-     * Return dynamic length string from buffer.
+     * Return fix-length string from buffer without null-terminate checking.
+     * Fix bug #17 {@link https://github.com/AlibabaTech/canal/issues/17 }
      */
-    public final String getString(final int pos) {
-        return getString(pos, ISO_8859_1);
+    public final String getFullString(final int pos, final int len, Charset charset) {
+        if (pos + len > limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + (pos < 0 ? pos : (pos + len)));
+        }
+
+        return new String(buffer, origin + pos, len, charset);
     }
 
     /**
-     * Return next dynamic length string from buffer.
+     * Return next fix-length string from buffer without null-terminate
+     * checking.
+     * Fix bug #17 {@link https://github.com/AlibabaTech/canal/issues/17 }
      */
-    public final String getString() {
-        return getString(ISO_8859_1);
+    public final String getFullString(final int len, Charset charset) {
+        if (position + len > origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position + len - origin));
+        }
+
+        String string = new String(buffer, position, len, charset);
+        position += len;
+        return string;
     }
 
+
     /**
      * Return dynamic length string from buffer.
      */
-    public final String getString(final int pos, String charsetName) {
-        if (pos >= limit || pos < 0) throw new IllegalArgumentException("limit excceed: " + pos);
+    public final String getString(final int pos) {
+        if (pos >= limit || pos < 0) {
+            throw new IllegalArgumentException("limit excceed: " + pos);
+        }
 
         byte[] buf = buffer;
         final int len = (0xff & buf[origin + pos]);
-        if (pos + len + 1 > limit) throw new IllegalArgumentException("limit excceed: " + (pos + len + 1));
-
-        try {
-            return new String(buf, origin + pos + 1, len, charsetName);
-        } catch (UnsupportedEncodingException e) {
-            throw new IllegalArgumentException("Unsupported encoding: " + charsetName, e);
+        if (pos + len + 1 > limit) {
+            throw new IllegalArgumentException("limit excceed: " + (pos + len + 1));
         }
+
+        return new String(buf, origin + pos + 1, len, StandardCharsets.ISO_8859_1);
     }
 
     /**
      * Return next dynamic length string from buffer.
      */
-    public final String getString(String charsetName) {
-        if (position >= origin + limit) throw new IllegalArgumentException("limit excceed: " + position);
+    public final String getString() {
+        if (position >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + position);
+        }
 
         byte[] buf = buffer;
         final int len = (0xff & buf[position]);
-        if (position + len + 1 > origin + limit) throw new IllegalArgumentException("limit excceed: "
-                                                                                    + (position + len + 1 - origin));
-
-        try {
-            String string = new String(buf, position + 1, len, charsetName);
-            position += len + 1;
-            return string;
-        } catch (UnsupportedEncodingException e) {
-            throw new IllegalArgumentException("Unsupported encoding: " + charsetName, e);
+        if (position + len + 1 > origin + limit) {
+            throw new IllegalArgumentException("limit excceed: "
+                                               + (position + len + 1 - origin));
+        }
+
+        String string = new String(buf, position + 1, len, StandardCharsets.ISO_8859_1);
+        position += len + 1;
+        return string;
+    }
+
+    public final String getName() {
+        if (position >= origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + position);
+        }
+
+        byte[] buf = buffer;
+        final int len = (0xff & buf[position]);
+        if (position + len + 1 > origin + limit) {
+            throw new IllegalArgumentException("limit excceed: " + (position + len + 1 - origin));
+        }
+
+        String string = NameCache.name(buf, position + 1, len, StandardCharsets.ISO_8859_1);
+        if (string == null) {
+            string = new String(buf, position + 1, len, StandardCharsets.ISO_8859_1);
         }
+        position += len + 1;
+        return string;
     }
 
     /**

Різницю між файлами не показано, бо вона завелика
+ 0 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/NameCache.java


+ 10 - 7
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/GtidLogEvent.java

@@ -18,6 +18,7 @@ public class GtidLogEvent extends LogEvent {
     // / Length of SID in event encoding
     public static final int ENCODED_SID_LENGTH          = 16;
     public static final int LOGICAL_TIMESTAMP_TYPE_CODE = 2;
+    public static final UUID UUID_ZERO                   = new UUID(0, 0);
 
     private boolean         commitFlag;
     private UUID            sid;
@@ -35,11 +36,13 @@ public class GtidLogEvent extends LogEvent {
         buffer.position(commonHeaderLen);
         commitFlag = (buffer.getUint8() != 0); // ENCODED_FLAG_LENGTH
 
-        byte[] bs = buffer.getData(ENCODED_SID_LENGTH);
-        ByteBuffer bb = ByteBuffer.wrap(bs);
-        long high = bb.getLong();
-        long low = bb.getLong();
-        sid = new UUID(high, low);
+        long high = buffer.getBeLong64();
+        long low = buffer.getBeLong64();
+        if (high == 0 && low == 0) {
+            sid = UUID_ZERO;
+        } else {
+            sid = new UUID(high, low);
+        }
 
         gno = buffer.getLong64();
 
@@ -73,11 +76,11 @@ public class GtidLogEvent extends LogEvent {
         return gno;
     }
 
-    public Long getLastCommitted() {
+    public long getLastCommitted() {
         return lastCommitted;
     }
 
-    public Long getSequenceNumber() {
+    public long getSequenceNumber() {
         return sequenceNumber;
     }
 

+ 3 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/HeartbeatLogEvent.java

@@ -3,6 +3,8 @@ package com.taobao.tddl.dbsync.binlog.event;
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 
+import java.nio.charset.StandardCharsets;
+
 /**
  * <pre>
  * Replication event to ensure to slave that master is alive.
@@ -35,7 +37,7 @@ public class HeartbeatLogEvent extends LogEvent {
             identLen = FN_REFLEN - 1;
         }
 
-        logIdent = buffer.getFullString(commonHeaderLen, identLen, LogBuffer.ISO_8859_1);
+        logIdent = buffer.getFullString(commonHeaderLen, identLen, StandardCharsets.ISO_8859_1);
     }
 
     public int getIdentLen() {

+ 1 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/IncidentLogEvent.java

@@ -67,7 +67,7 @@ public final class IncidentLogEvent extends LogEvent {
         incident = incidentNumber;
 
         buffer.position(commonHeaderLen + postHeaderLen);
-        message = buffer.getString();
+        message = buffer.getName();
     }
 
     public final int getIncident() {

+ 12 - 12
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/QueryLogEvent.java

@@ -433,7 +433,7 @@ public class QueryLogEvent extends LogEvent {
     private int             serverCollation           = -1;
     private int             tvSec                     = -1;
     private BigInteger      ddlXid                    = BigInteger.valueOf(-1L);
-    private String          charsetName;
+    private Charset         charset;
 
     private String          timezone;
 
@@ -498,12 +498,12 @@ public class QueryLogEvent extends LogEvent {
 
         /* A 2nd variable part; this is common to all versions */
         final int queryLen = dataLen - dbLen - 1;
-        dbname = buffer.getFixString(dbLen + 1);
+        dbname = buffer.getFixName(dbLen + 1);
         if (clientCharset >= 0) {
-            charsetName = CharsetConversion.getJavaCharset(clientCharset);
+            charset = CharsetConversion.getNioCharset(clientCharset);
 
-            if ((charsetName != null) && (Charset.isSupported(charsetName))) {
-                query = buffer.getFixString(queryLen, charsetName);
+            if (charset != null) {
+                query = buffer.getFixString(queryLen, charset);
             } else {
                 logger.warn("unsupported character set in query log: " + "\n    ID = " + clientCharset + ", Charset = "
                             + CharsetConversion.getCharset(clientCharset) + ", Collation = "
@@ -623,7 +623,7 @@ public class QueryLogEvent extends LogEvent {
                         // is ulonglong
                         break;
                     case Q_CATALOG_NZ_CODE:
-                        catalog = buffer.getString();
+                        catalog = buffer.getName();
                         break;
                     case Q_AUTO_INCREMENT:
                         autoIncrementIncrement = buffer.getUint16();
@@ -639,7 +639,7 @@ public class QueryLogEvent extends LogEvent {
                         serverCollation = buffer.getUint16();
                         break;
                     case Q_TIME_ZONE_CODE:
-                        timezone = buffer.getString();
+                        timezone = buffer.getName();
                         break;
                     case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
                         final int len = buffer.getUint8();
@@ -663,8 +663,8 @@ public class QueryLogEvent extends LogEvent {
                         buffer.forward(4);
                         break;
                     case Q_INVOKER:
-                        user = buffer.getString();
-                        host = buffer.getString();
+                        user = buffer.getName();
+                        host = buffer.getName();
                         break;
                     case Q_MICROSECONDS:
                         // when.tv_usec= uint3korr(pos);
@@ -685,7 +685,7 @@ public class QueryLogEvent extends LogEvent {
                         String mtsAccessedDbNames[] = new String[mtsAccessedDbs];
                         for (int i = 0; i < mtsAccessedDbs && buffer.position() < end; i++) {
                             int length = end - buffer.position();
-                            mtsAccessedDbNames[i] = buffer.getFixString(length < NAME_LEN ? length : NAME_LEN);
+                            mtsAccessedDbNames[i] = buffer.getFixName(length < NAME_LEN ? length : NAME_LEN);
                         }
                         break;
                     case Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP:
@@ -829,8 +829,8 @@ public class QueryLogEvent extends LogEvent {
         return autoIncrementOffset;
     }
 
-    public final String getCharsetName() {
-        return charsetName;
+    public final Charset getCharset() {
+        return charset;
     }
 
     public final String getTimezone() {

+ 9 - 8
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java

@@ -1,6 +1,7 @@
 package com.taobao.tddl.dbsync.binlog.event;
 
 import java.io.Serializable;
+import java.nio.charset.Charset;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.BitSet;
@@ -33,7 +34,7 @@ public final class RowsLogBuffer {
     private final LogBuffer    buffer;
     private final int          columnLen;
     private final int          jsonColumnCount;
-    private final String       charsetName;
+    private final Charset      charset;
 
     private final BitSet       nullBits;
     private int                nullBitIndex;
@@ -47,10 +48,10 @@ public final class RowsLogBuffer {
     private int                length;
     private Serializable       value;
 
-    public RowsLogBuffer(LogBuffer buffer, final int columnLen, String charsetName, int jsonColumnCount, boolean partial){
+    public RowsLogBuffer(LogBuffer buffer, final int columnLen, Charset charset, int jsonColumnCount, boolean partial){
         this.buffer = buffer;
         this.columnLen = columnLen;
-        this.charsetName = charsetName;
+        this.charset = charset;
         this.partial = partial;
         this.jsonColumnCount = jsonColumnCount;
         this.nullBits = new BitSet(columnLen);
@@ -1017,7 +1018,7 @@ public final class RowsLogBuffer {
                     javaType = Types.VARBINARY;
                     value = binary;
                 } else {
-                    value = buffer.getFullString(len, charsetName);
+                    value = buffer.getFullString(len, charset);
                     javaType = Types.VARCHAR;
                 }
 
@@ -1039,7 +1040,7 @@ public final class RowsLogBuffer {
                     javaType = Types.BINARY;
                     value = binary;
                 } else {
-                    value = buffer.getFullString(len, charsetName);
+                    value = buffer.getFullString(len, charset);
                     javaType = Types.CHAR; // Types.VARCHAR;
                 }
                 length = len;
@@ -1074,7 +1075,7 @@ public final class RowsLogBuffer {
                         len,
                         columnName,
                         columnIndex,
-                        charsetName);
+                            charset);
                     value = builder.toString();
                     buffer.position(position + len);
                 } else {
@@ -1088,9 +1089,9 @@ public final class RowsLogBuffer {
                         Json_Value jsonValue = JsonConversion.parse_value(buffer.getUint8(),
                             buffer,
                             len - 1,
-                            charsetName);
+                                charset);
                         StringBuilder builder = new StringBuilder();
-                        jsonValue.toJsonString(builder, charsetName);
+                        jsonValue.toJsonString(builder, charset);
                         value = builder.toString();
                         buffer.position(position + len);
                     }

+ 3 - 2
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java

@@ -1,5 +1,6 @@
 package com.taobao.tddl.dbsync.binlog.event;
 
+import java.nio.charset.Charset;
 import java.util.BitSet;
 
 import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;
@@ -219,8 +220,8 @@ public abstract class RowsLogEvent extends LogEvent {
         return changeColumns;
     }
 
-    public final RowsLogBuffer getRowsBuf(String charsetName) {
-        return new RowsLogBuffer(rowsBuf, columnLen, charsetName, jsonColumnCount, partial);
+    public final RowsLogBuffer getRowsBuf(Charset charset) {
+        return new RowsLogBuffer(rowsBuf, columnLen, charset, jsonColumnCount, partial);
     }
 
     public final int getFlags(final int flags) {

+ 3 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsQueryLogEvent.java

@@ -2,6 +2,8 @@ package com.taobao.tddl.dbsync.binlog.event;
 
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
 
+import java.nio.charset.StandardCharsets;
+
 /**
  * @author jianghang 2013-4-8 上午12:36:29
  * @version 1.0.3
@@ -23,7 +25,7 @@ public class RowsQueryLogEvent extends IgnorableLogEvent {
          */
         int offset = commonHeaderLen + postHeaderLen + 1;
         int len = buffer.limit() - offset;
-        rowsQuery = buffer.getFullString(offset, len, LogBuffer.ISO_8859_1);
+        rowsQuery = buffer.getFullString(offset, len, StandardCharsets.ISO_8859_1);
     }
 
     public String getRowsQuery() {

+ 1 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/StartLogEventV3.java

@@ -36,7 +36,7 @@ public class StartLogEventV3 extends LogEvent {
 
         buffer.position(descriptionEvent.commonHeaderLen);
         binlogVersion = buffer.getUint16(); // ST_BINLOG_VER_OFFSET
-        serverVersion = buffer.getFixString(ST_SERVER_VER_LEN); // ST_SERVER_VER_OFFSET
+        serverVersion = buffer.getFixName(ST_SERVER_VER_LEN); // ST_SERVER_VER_OFFSET
     }
 
     public StartLogEventV3(){

+ 2 - 2
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/TableMapLogEvent.java

@@ -423,10 +423,10 @@ public final class TableMapLogEvent extends LogEvent {
 
         /* Read the variable part of the event */
         buffer.position(commonHeaderLen + postHeaderLen);
-        dbname = buffer.getString();
+        dbname = buffer.getName();
         buffer.forward(1); /* termination null */
         // fixed issue #2714
-        tblname = buffer.getString();
+        tblname = buffer.getName();
         buffer.forward(1); /* termination null */
 
         // Read column information from buffer

+ 3 - 2
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/UserVarLogEvent.java

@@ -2,6 +2,7 @@ package com.taobao.tddl.dbsync.binlog.event;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.charset.Charset;
 
 import com.taobao.tddl.dbsync.binlog.CharsetConversion;
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
@@ -100,8 +101,8 @@ public final class UserVarLogEvent extends LogEvent {
                     value = buffer.getDecimal(precision, scale); // bin2decimal
                     break;
                 case STRING_RESULT:
-                    String charsetName = CharsetConversion.getJavaCharset(charsetNumber);
-                    value = buffer.getFixString(valueLen, charsetName);
+                    Charset charset = CharsetConversion.getNioCharset(charsetNumber);
+                    value = buffer.getFixString(valueLen, charset);
                     break;
                 case ROW_RESULT:
                     // this seems to be banned in MySQL altogether

+ 3 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/mariadb/AnnotateRowsEvent.java

@@ -5,6 +5,8 @@ import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.IgnorableLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.LogHeader;
 
+import java.nio.charset.StandardCharsets;
+
 /**
  * mariadb的ANNOTATE_ROWS_EVENT类型
  * 
@@ -23,7 +25,7 @@ public class AnnotateRowsEvent extends IgnorableLogEvent {
 
         int offset = commonHeaderLen + postHeaderLen;
         int len = buffer.limit() - offset;
-        rowsQuery = buffer.getFullString(offset, len, LogBuffer.ISO_8859_1);
+        rowsQuery = buffer.getFullString(offset, len, StandardCharsets.ISO_8859_1);
     }
 
     public String getRowsQuery() {

+ 1 - 1
dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/BaseLogFetcherTest.java

@@ -52,7 +52,7 @@ public class BaseLogFetcherTest {
                 event.getHeader().getLogPos() - event.getHeader().getEventLen(),
                 event.getTable().getDbName(),
                 event.getTable().getTableName()));
-            RowsLogBuffer buffer = event.getRowsBuf(charset.name());
+            RowsLogBuffer buffer = event.getRowsBuf(charset);
             BitSet columns = event.getColumns();
             BitSet changeColumns = event.getChangeColumns();
             while (buffer.nextOneRow(columns)) {

+ 1 - 1
meta/src/main/java/com/alibaba/otter/canal/meta/FileMixedMetaManager.java

@@ -171,7 +171,7 @@ public class FileMixedMetaManager extends MemoryMetaManager implements CanalMeta
                 return null;
             }
 
-            String json = FileUtils.readFileToString(dataFile, charset.name());
+            String json = FileUtils.readFileToString(dataFile, charset);
             return JsonUtils.unmarshalFromString(json, FileMetaInstanceData.class);
         } catch (IOException e) {
             throw new CanalMetaManagerException(e);

+ 6 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -175,7 +175,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         builder.setKey("gtid");
         builder.setValue(logEvent.getGtidStr());
 
-        if (logEvent.getLastCommitted() != null) {
+        if (logEvent.getLastCommitted() != -1) {
             builder.setKey("lastCommitted");
             builder.setValue(String.valueOf(logEvent.getLastCommitted()));
             builder.setKey("sequenceNumber");
@@ -392,7 +392,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         // mysql5.6支持,需要设置binlog-rows-query-log-events=1,可详细打印原始DML语句
         String queryString = null;
         try {
-            queryString = new String(event.getRowsQuery().getBytes(ISO_8859_1), charset.name());
+            queryString = new String(event.getRowsQuery().getBytes(ISO_8859_1), charset);
             String tableName = null;
             if (useDruidDdlFilter) {
                 List<DdlResult> results = DruidDdlParser.parse(queryString, null);
@@ -414,7 +414,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         // mariaDb支持,需要设置binlog_annotate_row_events=true,可详细打印原始DML语句
         String queryString = null;
         try {
-            queryString = new String(event.getRowsQuery().getBytes(ISO_8859_1), charset.name());
+            queryString = new String(event.getRowsQuery().getBytes(ISO_8859_1), charset);
             return buildQueryEntry(queryString, event.getHeader());
         } catch (UnsupportedEncodingException e) {
             throw new CanalParseException(e);
@@ -508,10 +508,10 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
     public void parseTableMapEvent(TableMapLogEvent event) {
         try {
-            String charsetDbName = new String(event.getDbName().getBytes(ISO_8859_1), charset.name());
+            String charsetDbName = new String(event.getDbName().getBytes(ISO_8859_1), charset);
             event.setDbname(charsetDbName);
 
-            String charsetTbName = new String(event.getTableName().getBytes(ISO_8859_1), charset.name());
+            String charsetTbName = new String(event.getTableName().getBytes(ISO_8859_1), charset);
             event.setTblname(charsetTbName);
         } catch (UnsupportedEncodingException e) {
             throw new CanalParseException(e);
@@ -550,7 +550,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             rowChangeBuider.setIsDdl(false);
 
             rowChangeBuider.setEventType(eventType);
-            RowsLogBuffer buffer = event.getRowsBuf(charset.name());
+            RowsLogBuffer buffer = event.getRowsBuf(charset);
             BitSet columns = event.getColumns();
             BitSet changeColumns = event.getChangeColumns();
 

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java

@@ -182,7 +182,7 @@ public class FileMixedLogPositionManager extends AbstractLogPositionManager {
                 return null;
             }
 
-            String json = FileUtils.readFileToString(dataFile, charset.name());
+            String json = FileUtils.readFileToString(dataFile, charset);
             return JsonUtils.unmarshalFromString(json, LogPosition.class);
         } catch (IOException e) {
             throw new CanalMetaManagerException(e);

+ 1 - 1
parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogParsePerformanceTest.java

@@ -134,7 +134,7 @@ public class MysqlBinlogParsePerformanceTest {
 
     public static void parseRowsEvent(RowsLogEvent event, AtomicLong sum) {
         try {
-            RowsLogBuffer buffer = event.getRowsBuf(charset.name());
+            RowsLogBuffer buffer = event.getRowsBuf(charset);
             BitSet columns = event.getColumns();
             BitSet changeColumns = event.getChangeColumns();
             while (buffer.nextOneRow(columns)) {

Деякі файли не було показано, через те що забагато файлів було змінено