Browse Source

fixed performace

七锋 7 years ago
parent
commit
9b7de00efb

+ 16 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/FileLogFetcher.java

@@ -75,7 +75,9 @@ public final class FileLogFetcher extends LogFetcher {
         fin = new FileInputStream(file);
 
         ensureCapacity(BIN_LOG_HEADER_SIZE);
-        if (BIN_LOG_HEADER_SIZE != fin.read(buffer, 0, BIN_LOG_HEADER_SIZE)) throw new IOException("No binlog file header");
+        if (BIN_LOG_HEADER_SIZE != fin.read(buffer, 0, BIN_LOG_HEADER_SIZE)) {
+            throw new IOException("No binlog file header");
+        }
 
         if (buffer[0] != BINLOG_MAGIC[0] || buffer[1] != BINLOG_MAGIC[1] || buffer[2] != BINLOG_MAGIC[2]
             || buffer[3] != BINLOG_MAGIC[3]) {
@@ -128,6 +130,19 @@ public final class FileLogFetcher extends LogFetcher {
                 return true;
             }
         } else if (limit > 0) {
+            if (limit >= FormatDescriptionLogEvent.LOG_EVENT_HEADER_LEN) {
+                int lenPosition = position + 4 + 1 + 4;
+                long eventLen = ((long) (0xff & buffer[lenPosition++])) | ((long) (0xff & buffer[lenPosition++]) << 8)
+                                | ((long) (0xff & buffer[lenPosition++]) << 16)
+                                | ((long) (0xff & buffer[lenPosition++]) << 24);
+
+                if (limit >= eventLen) {
+                    return true;
+                } else {
+                    ensureCapacity((int) eventLen);
+                }
+            }
+
             System.arraycopy(buffer, origin, buffer, 0, limit);
             position -= origin;
             origin = 0;

+ 111 - 26
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java

@@ -480,13 +480,27 @@ public final class RowsLogBuffer {
                     // cal.set(d / 10000, (d % 10000) / 100 - 1, d % 100, t /
                     // 10000, (t % 10000) / 100, t % 100);
                     // value = new Timestamp(cal.getTimeInMillis());
-                    value = String.format("%04d-%02d-%02d %02d:%02d:%02d",
-                        d / 10000,
-                        (d % 10000) / 100,
-                        d % 100,
-                        t / 10000,
-                        (t % 10000) / 100,
-                        t % 100);
+                    // value = String.format("%04d-%02d-%02d %02d:%02d:%02d",
+                    // d / 10000,
+                    // (d % 10000) / 100,
+                    // d % 100,
+                    // t / 10000,
+                    // (t % 10000) / 100,
+                    // t % 100);
+
+                    StringBuilder builder = new StringBuilder();
+                    builder.append(formatNumber(d / 10000, 4))
+                        .append('-')
+                        .append(formatNumber((d % 10000) / 100, 2))
+                        .append('-')
+                        .append(formatNumber(d % 100, 2))
+                        .append(' ')
+                        .append(formatNumber(t / 10000, 2))
+                        .append(':')
+                        .append(formatNumber((t % 10000) / 100, 2))
+                        .append(':')
+                        .append(formatNumber(t % 100, 2));
+                    value = builder.toString();
                 }
                 javaType = Types.TIMESTAMP;
                 length = 8;
@@ -540,13 +554,27 @@ public final class RowsLogBuffer {
                     // % (1 << 5)), (int) (hms >> 12),
                     // (int) ((hms >> 6) % (1 << 6)), (int) (hms % (1 << 6)));
                     // value = new Timestamp(cal.getTimeInMillis());
-                    second = String.format("%04d-%02d-%02d %02d:%02d:%02d",
-                        (int) (ym / 13),
-                        (int) (ym % 13),
-                        (int) (ymd % (1 << 5)),
-                        (int) (hms >> 12),
-                        (int) ((hms >> 6) % (1 << 6)),
-                        (int) (hms % (1 << 6)));
+                    // second = String.format("%04d-%02d-%02d %02d:%02d:%02d",
+                    // (int) (ym / 13),
+                    // (int) (ym % 13),
+                    // (int) (ymd % (1 << 5)),
+                    // (int) (hms >> 12),
+                    // (int) ((hms >> 6) % (1 << 6)),
+                    // (int) (hms % (1 << 6)));
+
+                    StringBuilder builder = new StringBuilder();
+                    builder.append(formatNumber((int) (ym / 13), 4))
+                        .append('-')
+                        .append(formatNumber((int) (ym % 13), 2))
+                        .append('-')
+                        .append(formatNumber((int) (ymd % (1 << 5)), 2))
+                        .append(' ')
+                        .append(formatNumber((int) (hms >> 12), 2))
+                        .append(':')
+                        .append(formatNumber((int) ((hms >> 6) % (1 << 6)), 2))
+                        .append(':')
+                        .append(formatNumber((int) (hms % (1 << 6)), 2));
+                    second = builder.toString();
                 }
 
                 if (meta >= 1) {
@@ -575,11 +603,22 @@ public final class RowsLogBuffer {
                     // cal.set(70, 0, 1, i32 / 10000, (i32 % 10000) / 100, i32 %
                     // 100);
                     // value = new Time(cal.getTimeInMillis());
-                    value = String.format("%s%02d:%02d:%02d",
-                        (i32 >= 0) ? "" : "-",
-                        u32 / 10000,
-                        (u32 % 10000) / 100,
-                        u32 % 100);
+                    // value = String.format("%s%02d:%02d:%02d",
+                    // (i32 >= 0) ? "" : "-",
+                    // u32 / 10000,
+                    // (u32 % 10000) / 100,
+                    // u32 % 100);
+
+                    StringBuilder builder = new StringBuilder();
+                    if (i32 < 0) {
+                        builder.append('-');
+                    }
+                    builder.append(formatNumber(u32 / 10000, 2))
+                        .append(':')
+                        .append(formatNumber((u32 % 10000) / 100, 2))
+                        .append(':')
+                        .append(formatNumber(u32 % 100, 2));
+                    value = builder.toString();
                 }
                 javaType = Types.TIME;
                 length = 3;
@@ -673,11 +712,22 @@ public final class RowsLogBuffer {
                     // value = new Time(cal.getTimeInMillis());
                     long ultime = Math.abs(ltime);
                     intpart = ultime >> 24;
-                    second = String.format("%s%02d:%02d:%02d",
-                        ltime >= 0 ? "" : "-",
-                        (int) ((intpart >> 12) % (1 << 10)),
-                        (int) ((intpart >> 6) % (1 << 6)),
-                        (int) (intpart % (1 << 6)));
+                    // second = String.format("%s%02d:%02d:%02d",
+                    // ltime >= 0 ? "" : "-",
+                    // (int) ((intpart >> 12) % (1 << 10)),
+                    // (int) ((intpart >> 6) % (1 << 6)),
+                    // (int) (intpart % (1 << 6)));
+
+                    StringBuilder builder = new StringBuilder();
+                    if (ltime < 0) {
+                        builder.append('-');
+                    }
+                    builder.append(formatNumber((int) ((intpart >> 12) % (1 << 10)), 2))
+                        .append(':')
+                        .append(formatNumber((int) ((intpart >> 6) % (1 << 6)), 2))
+                        .append(':')
+                        .append(formatNumber((int) (intpart % (1 << 6)), 2));
+                    second = builder.toString();
                 }
 
                 if (meta >= 1) {
@@ -717,7 +767,16 @@ public final class RowsLogBuffer {
                     // cal.set((i32 / (16 * 32)), (i32 / 32 % 16) - 1, (i32 %
                     // 32));
                     // value = new java.sql.Date(cal.getTimeInMillis());
-                    value = String.format("%04d-%02d-%02d", i32 / (16 * 32), i32 / 32 % 16, i32 % 32);
+                    // value = String.format("%04d-%02d-%02d", i32 / (16 * 32),
+                    // i32 / 32 % 16, i32 % 32);
+
+                    StringBuilder builder = new StringBuilder();
+                    builder.append(formatNumber(i32 / (16 * 32), 4))
+                        .append('-')
+                        .append(formatNumber(i32 / 32 % 16, 2))
+                        .append('-')
+                        .append(formatNumber(i32 % 32, 2));
+                    value = builder.toString();
                 }
                 javaType = Types.DATE;
                 length = 3;
@@ -970,7 +1029,8 @@ public final class RowsLogBuffer {
                         throw new IllegalArgumentException("!! Unknown JSON packlen = " + meta);
                 }
                 if (0 == len) {
-                    // fixed issue #1 by lava, json column of zero length has no value, value parsing should be skipped
+                    // fixed issue #1 by lava, json column of zero length has no
+                    // value, value parsing should be skipped
                     value = "";
                 } else {
                     int position = buffer.position();
@@ -1066,4 +1126,29 @@ public final class RowsLogBuffer {
         return sec.substring(0, meta);
     }
 
+    private String formatNumber(int d, int size) {
+        return leftPad(String.valueOf(d), size, '0');
+    }
+
+    private String leftPad(String str, int size, char padChar) {
+        if (str == null) {
+            return null;
+        }
+        int pads = size - str.length();
+        if (pads <= 0) {
+            return str; // returns original String when possible
+        }
+        return padding(pads, padChar).concat(str);
+    }
+
+    private String padding(int repeat, char padChar) throws IndexOutOfBoundsException {
+        if (repeat < 0) {
+            throw new IndexOutOfBoundsException("Cannot pad a negative amount: " + repeat);
+        }
+        final char[] buf = new char[repeat];
+        for (int i = 0; i < buf.length; i++) {
+            buf[i] = padChar;
+        }
+        return new String(buf);
+    }
 }

+ 1 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -8,7 +8,6 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.mysql.local.BinLogFileQueue;
@@ -91,7 +90,7 @@ public class LocalBinLogConnection implements ErosaConnection {
                 while (fetcher.fetch()) {
                     event = decoder.decode(fetcher, context);
                     if (event == null) {
-                        throw new CanalParseException("parse failed");
+                        continue;
                     }
 
                     if (!func.sink(event)) {