瀏覽代碼

fixed issue #726 , 提交测试用例

七锋 6 年之前
父節點
當前提交
77c1c65b83

+ 57 - 0
dbsync/src/test/java/com/taobao/tddl/dbsync/FetcherPerformanceTest.java

@@ -0,0 +1,57 @@
+package com.taobao.tddl.dbsync;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.taobao.tddl.dbsync.binlog.DirectLogFetcher;
+import com.taobao.tddl.dbsync.binlog.LogContext;
+import com.taobao.tddl.dbsync.binlog.LogDecoder;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
+
+public class FetcherPerformanceTest {
+
+    public static void main(String args[]) {
+        DirectLogFetcher fetcher = new DirectLogFetcher();
+        try {
+            Class.forName("com.mysql.jdbc.Driver");
+            Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306",
+                "root",
+                "hello");
+            Statement statement = connection.createStatement();
+            statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'");
+            statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");
+
+            fetcher.open(connection, "mysql-bin.000006", 120L, 2);
+
+            AtomicLong sum = new AtomicLong(0);
+            long start = System.currentTimeMillis();
+            long last = 0;
+            long end = 0;
+
+            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
+            LogContext context = new LogContext();
+            while (fetcher.fetch()) {
+                decoder.decode(fetcher, context);
+                sum.incrementAndGet();
+                long current = sum.get();
+                if (current - last >= 100000) {
+                    end = System.currentTimeMillis();
+                    long tps = ((current - last) * 1000) / (end - start);
+                    System.out.println(" total : " + sum + " , cost : " + (end - start) + " , tps : " + tps);
+                    last = current;
+                    start = end;
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                fetcher.close();
+            } catch (IOException e) {
+            }
+        }
+    }
+}

+ 68 - 0
example/src/main/java/com/alibaba/otter/canal/example/SimpleCanalClientPermanceTest.java

@@ -0,0 +1,68 @@
+package com.alibaba.otter.canal.example;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalConnectors;
+import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
+import com.alibaba.otter.canal.protocol.Message;
+
+public class SimpleCanalClientPermanceTest {
+
+    public static void main(String args[]) {
+        String destination = "example";
+        String ip = "127.0.0.1";
+        int batchSize = 1024;
+        int count = 0;
+        int sum = 0;
+        int perSum = 0;
+        long start = System.currentTimeMillis();
+        long end = 0;
+        final ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(100);
+        try {
+            final CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111),
+                destination,
+                "",
+                "");
+
+            Thread ackThread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    while (true) {
+                        try {
+                            long batchId = queue.take();
+                            connector.ack(batchId);
+                        } catch (InterruptedException e) {
+                        }
+                    }
+                }
+            });
+            ackThread.start();
+
+            ((SimpleCanalConnector) connector).setLazyParseEntry(true);
+            connector.connect();
+            connector.subscribe();
+            while (true) {
+                Message message = connector.getWithoutAck(batchSize);
+                long batchId = message.getId();
+                int size = message.getRawEntries().size();
+                sum += size;
+                perSum += size;
+                count++;
+                queue.add(batchId);
+                if (count % 10 == 0) {
+                    end = System.currentTimeMillis();
+                    long tps = (perSum * 1000) / (end - start);
+                    System.out.println(" total : " + sum + " , current : " + perSum + " , cost : " + (end - start)
+                                       + " , tps : " + tps);
+                    start = end;
+                    perSum = 0;
+                }
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 89 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogDumpPerformanceTest.java

@@ -0,0 +1,89 @@
+package com.alibaba.otter.canal.parse;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
+import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
+import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+import com.alibaba.otter.canal.protocol.position.LogPosition;
+import com.alibaba.otter.canal.sink.CanalEventSink;
+import com.alibaba.otter.canal.sink.exception.CanalSinkException;
+
+public class MysqlBinlogDumpPerformanceTest {
+
+    public static void main(String args[]) {
+        final MysqlEventParser controller = new MysqlEventParser();
+        final EntryPosition startPosition = new EntryPosition("mysql-bin.001699", 120L, 100L);
+        controller.setConnectionCharset(Charset.forName("UTF-8"));
+        controller.setSlaveId(3344L);
+        controller.setDetectingEnable(false);
+        controller.setFilterQueryDml(true);
+        controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("127.0.0.1", 3328), "root", "hello"));
+        controller.setMasterPosition(startPosition);
+        controller.setEnableTsdb(false);
+        controller.setDestination("example");
+        controller.setTsdbSpringXml("classpath:spring/tsdb/h2-tsdb.xml");
+        // controller.setEventFilter(new AviaterRegexFilter("test\\..*"));
+        // controller.setEventBlackFilter(new
+        // AviaterRegexFilter("canal_tsdb\\..*"));
+        controller.setParallel(true);
+        controller.setParallelBufferSize(256);
+        controller.setParallelThreadSize(16);
+        controller.setIsGTIDMode(false);
+        final AtomicLong sum = new AtomicLong(0);
+        final AtomicLong last = new AtomicLong(0);
+        final AtomicLong start = new AtomicLong(System.currentTimeMillis());
+        final AtomicLong end = new AtomicLong(0);
+        controller.setEventSink(new AbstractCanalEventSinkTest<List<CanalEntry.Entry>>() {
+
+            public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination)
+                                                                                                                   throws CanalSinkException,
+                                                                                                                   InterruptedException {
+
+                sum.addAndGet(entrys.size());
+                long current = sum.get();
+                if (current - last.get() >= 100000) {
+                    end.set(System.currentTimeMillis());
+                    long tps = ((current - last.get()) * 1000) / (end.get() - start.get());
+                    System.out.println(" total : " + sum + " , cost : " + (end.get() - start.get()) + " , tps : " + tps);
+                    last.set(current);
+                    start.set(end.get());
+                }
+                return true;
+            }
+
+        });
+        controller.setLogPositionManager(new AbstractLogPositionManager() {
+
+            @Override
+            public LogPosition getLatestIndexBy(String destination) {
+                return null;
+            }
+
+            @Override
+            public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
+            }
+        });
+
+        controller.start();
+
+        try {
+            Thread.sleep(100 * 1000 * 1000L);
+        } catch (InterruptedException e) {
+        }
+        controller.stop();
+    }
+
+    public static abstract class AbstractCanalEventSinkTest<T> extends AbstractCanalLifeCycle implements CanalEventSink<T> {
+
+        public void interrupt() {
+        }
+    }
+}

+ 82 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogEventPerformanceTest.java

@@ -0,0 +1,82 @@
+package com.alibaba.otter.canal.parse;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector;
+import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
+import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
+import com.taobao.tddl.dbsync.binlog.LogContext;
+import com.taobao.tddl.dbsync.binlog.LogDecoder;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
+
+public class MysqlBinlogEventPerformanceTest {
+
+    protected static Charset charset = Charset.forName("utf-8");
+
+    public static void main(String args[]) {
+        DirectLogFetcher fetcher = new DirectLogFetcher();
+        try {
+            MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
+            connector.connect();
+            updateSettings(connector);
+            sendBinlogDump(connector, "mysql-bin.000006", 120L, 3);
+            fetcher.start(connector.getChannel());
+            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
+            LogContext context = new LogContext();
+            AtomicLong sum = new AtomicLong(0);
+            long start = System.currentTimeMillis();
+            long last = 0;
+            long end = 0;
+            while (fetcher.fetch()) {
+                decoder.decode(fetcher, context);
+                sum.incrementAndGet();
+                long current = sum.get();
+                if (current - last >= 100000) {
+                    end = System.currentTimeMillis();
+                    long tps = ((current - last) * 1000) / (end - start);
+                    System.out.println(" total : " + sum + " , cost : " + (end - start) + " , tps : " + tps);
+                    last = current;
+                    start = end;
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                fetcher.close();
+            } catch (IOException e) {
+            }
+        }
+    }
+
+    private static void sendBinlogDump(MysqlConnector connector, String binlogfilename, Long binlogPosition, int slaveId)
+                                                                                                                         throws IOException {
+        BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket();
+        binlogDumpCmd.binlogFileName = binlogfilename;
+        binlogDumpCmd.binlogPosition = binlogPosition;
+        binlogDumpCmd.slaveServerId = slaveId;
+        byte[] cmdBody = binlogDumpCmd.toBytes();
+
+        HeaderPacket binlogDumpHeader = new HeaderPacket();
+        binlogDumpHeader.setPacketBodyLength(cmdBody.length);
+        binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
+        PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
+    }
+
+    private static void updateSettings(MysqlConnector connector) throws IOException {
+        update("set @master_binlog_checksum= '@@global.binlog_checksum'", connector);
+        update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'", connector);
+    }
+
+    public static void update(String cmd, MysqlConnector connector) throws IOException {
+        MysqlUpdateExecutor exector = new MysqlUpdateExecutor(connector);
+        exector.update(cmd);
+    }
+
+}

+ 192 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogParsePerformanceTest.java

@@ -0,0 +1,192 @@
+package com.alibaba.otter.canal.parse;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.BitSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector;
+import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
+import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
+import com.taobao.tddl.dbsync.binlog.LogBuffer;
+import com.taobao.tddl.dbsync.binlog.LogContext;
+import com.taobao.tddl.dbsync.binlog.LogDecoder;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
+import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer;
+import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent.ColumnInfo;
+import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
+
+public class MysqlBinlogParsePerformanceTest {
+
+    protected static Charset charset = Charset.forName("utf-8");
+
+    public static void main(String args[]) {
+        DirectLogFetcher fetcher = new DirectLogFetcher();
+        try {
+            MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
+            connector.connect();
+            updateSettings(connector);
+            sendBinlogDump(connector, "mysql-bin.000006", 120L, 3);
+            fetcher.start(connector.getChannel());
+            final BlockingQueue<LogBuffer> buffer = new ArrayBlockingQueue<LogBuffer>(1024);
+            Thread thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        consumer(buffer);
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+            thread.start();
+
+            while (fetcher.fetch()) {
+                buffer.put(fetcher.duplicate());
+                fetcher.consume(fetcher.limit());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                fetcher.close();
+            } catch (IOException e) {
+            }
+        }
+    }
+
+    public static void consumer(BlockingQueue<LogBuffer> buffer) throws IOException, InterruptedException {
+        LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
+        LogContext context = new LogContext();
+
+        AtomicLong sum = new AtomicLong(0);
+        long start = System.currentTimeMillis();
+        long last = 0;
+        long end = 0;
+        while (true) {
+            LogEvent event = null;
+            event = decoder.decode(buffer.take(), context);
+            int eventType = event.getHeader().getType();
+            switch (eventType) {
+                case LogEvent.ROTATE_EVENT:
+                    break;
+                case LogEvent.WRITE_ROWS_EVENT_V1:
+                case LogEvent.WRITE_ROWS_EVENT:
+                    parseRowsEvent((WriteRowsLogEvent) event, sum);
+                    break;
+                case LogEvent.UPDATE_ROWS_EVENT_V1:
+                case LogEvent.UPDATE_ROWS_EVENT:
+                    parseRowsEvent((UpdateRowsLogEvent) event, sum);
+                    break;
+                case LogEvent.DELETE_ROWS_EVENT_V1:
+                case LogEvent.DELETE_ROWS_EVENT:
+                    parseRowsEvent((DeleteRowsLogEvent) event, sum);
+                    break;
+                case LogEvent.XID_EVENT:
+                    sum.incrementAndGet();
+                    break;
+                case LogEvent.QUERY_EVENT:
+                    sum.incrementAndGet();
+                    break;
+                default:
+                    break;
+            }
+
+            long current = sum.get();
+            if (current - last >= 100000) {
+                end = System.currentTimeMillis();
+                long tps = ((current - last) * 1000) / (end - start);
+                System.out.println(" total : " + sum + " , cost : " + (end - start) + " , tps : " + tps);
+                last = current;
+                start = end;
+            }
+        }
+    }
+
+    private static void sendBinlogDump(MysqlConnector connector, String binlogfilename, Long binlogPosition, int slaveId)
+                                                                                                                         throws IOException {
+        BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket();
+        binlogDumpCmd.binlogFileName = binlogfilename;
+        binlogDumpCmd.binlogPosition = binlogPosition;
+        binlogDumpCmd.slaveServerId = slaveId;
+        byte[] cmdBody = binlogDumpCmd.toBytes();
+
+        HeaderPacket binlogDumpHeader = new HeaderPacket();
+        binlogDumpHeader.setPacketBodyLength(cmdBody.length);
+        binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
+        PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
+    }
+
+    private static void updateSettings(MysqlConnector connector) throws IOException {
+        update("set @master_binlog_checksum= '@@global.binlog_checksum'", connector);
+        update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'", connector);
+    }
+
+    public static void update(String cmd, MysqlConnector connector) throws IOException {
+        MysqlUpdateExecutor exector = new MysqlUpdateExecutor(connector);
+        exector.update(cmd);
+    }
+
+    public static void parseRowsEvent(RowsLogEvent event, AtomicLong sum) {
+        try {
+            RowsLogBuffer buffer = event.getRowsBuf(charset.name());
+            BitSet columns = event.getColumns();
+            BitSet changeColumns = event.getChangeColumns();
+            while (buffer.nextOneRow(columns)) {
+                int type = event.getHeader().getType();
+                if (LogEvent.WRITE_ROWS_EVENT_V1 == type || LogEvent.WRITE_ROWS_EVENT == type) {
+                    parseOneRow(event, buffer, columns, true);
+                } else if (LogEvent.DELETE_ROWS_EVENT_V1 == type || LogEvent.DELETE_ROWS_EVENT == type) {
+                    parseOneRow(event, buffer, columns, false);
+                } else {
+                    parseOneRow(event, buffer, columns, false);
+                    if (!buffer.nextOneRow(changeColumns)) {
+                        break;
+                    }
+                    parseOneRow(event, buffer, changeColumns, true);
+                }
+
+                sum.incrementAndGet();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("parse row data failed.", e);
+        }
+    }
+
+    public static void parseOneRow(RowsLogEvent event, RowsLogBuffer buffer, BitSet cols, boolean isAfter)
+                                                                                                          throws UnsupportedEncodingException {
+        TableMapLogEvent map = event.getTable();
+        if (map == null) {
+            throw new RuntimeException("not found TableMap with tid=" + event.getTableId());
+        }
+
+        final int columnCnt = map.getColumnCnt();
+        final ColumnInfo[] columnInfo = map.getColumnInfo();
+        for (int i = 0; i < columnCnt; i++) {
+            if (!cols.get(i)) {
+                continue;
+            }
+
+            ColumnInfo info = columnInfo[i];
+            buffer.nextValue(info.type, info.meta);
+            if (buffer.isNull()) {
+            } else {
+                buffer.getValue();
+            }
+        }
+    }
+}