Bladeren bron

fixed issue #726 , 优化完毕

七锋 7 jaren geleden
bovenliggende
commit
fe38bc16d2

+ 41 - 52
dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java

@@ -8,15 +8,6 @@ import java.sql.Statement;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
-
 public class DirectLogFetcherTest extends BaseLogFetcherTest {
 
     @Test
@@ -24,56 +15,54 @@ public class DirectLogFetcherTest extends BaseLogFetcherTest {
         DirectLogFetcher fecther = new DirectLogFetcher();
         try {
             Class.forName("com.mysql.jdbc.Driver");
-            Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", "root", "hello");
+            Connection connection = DriverManager.getConnection("jdbc:mysql://100.81.154.142:3306", "root", "hello");
             Statement statement = connection.createStatement();
             statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'");
             statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");
 
-            fecther.open(connection, "mysql-bin.000001", 4L, 2);
+            fecther.open(connection, "mysql-bin.000006", 120L, 2);
 
-            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
+            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.UNKNOWN_EVENT);
             LogContext context = new LogContext();
             while (fecther.fetch()) {
-                LogEvent event = null;
-                event = decoder.decode(fecther, context);
-
-                if (event == null) {
-                    continue;
-                    // throw new RuntimeException("parse failed");
-                }
-
-                int eventType = event.getHeader().getType();
-                switch (eventType) {
-                    case LogEvent.ROTATE_EVENT:
-                        binlogFileName = ((RotateLogEvent) event).getFilename();
-                        break;
-                    case LogEvent.WRITE_ROWS_EVENT_V1:
-                    case LogEvent.WRITE_ROWS_EVENT:
-                        parseRowsEvent((WriteRowsLogEvent) event);
-                        break;
-                    case LogEvent.UPDATE_ROWS_EVENT_V1:
-                    case LogEvent.UPDATE_ROWS_EVENT:
-                        parseRowsEvent((UpdateRowsLogEvent) event);
-                        break;
-                    case LogEvent.DELETE_ROWS_EVENT_V1:
-                    case LogEvent.DELETE_ROWS_EVENT:
-                        parseRowsEvent((DeleteRowsLogEvent) event);
-                        break;
-                    case LogEvent.QUERY_EVENT:
-                        parseQueryEvent((QueryLogEvent) event);
-                        break;
-                    case LogEvent.ROWS_QUERY_LOG_EVENT:
-                        parseRowsQueryEvent((RowsQueryLogEvent) event);
-                        break;
-                    case LogEvent.ANNOTATE_ROWS_EVENT:
-                        parseAnnotateRowsEvent((AnnotateRowsEvent) event);
-                        break;
-                    case LogEvent.XID_EVENT:
-                        parseXidEvent((XidLogEvent) event);
-                        break;
-                    default:
-                        break;
-                }
+                decoder.decode(fecther, context);
+                continue;
+                // if (event == null) {
+                // continue;
+                // }
+                //
+                // int eventType = event.getHeader().getType();
+                // switch (eventType) {
+                // case LogEvent.ROTATE_EVENT:
+                // binlogFileName = ((RotateLogEvent) event).getFilename();
+                // break;
+                // case LogEvent.WRITE_ROWS_EVENT_V1:
+                // case LogEvent.WRITE_ROWS_EVENT:
+                // parseRowsEvent((WriteRowsLogEvent) event);
+                // break;
+                // case LogEvent.UPDATE_ROWS_EVENT_V1:
+                // case LogEvent.UPDATE_ROWS_EVENT:
+                // parseRowsEvent((UpdateRowsLogEvent) event);
+                // break;
+                // case LogEvent.DELETE_ROWS_EVENT_V1:
+                // case LogEvent.DELETE_ROWS_EVENT:
+                // parseRowsEvent((DeleteRowsLogEvent) event);
+                // break;
+                // case LogEvent.QUERY_EVENT:
+                // parseQueryEvent((QueryLogEvent) event);
+                // break;
+                // case LogEvent.ROWS_QUERY_LOG_EVENT:
+                // parseRowsQueryEvent((RowsQueryLogEvent) event);
+                // break;
+                // case LogEvent.ANNOTATE_ROWS_EVENT:
+                // parseAnnotateRowsEvent((AnnotateRowsEvent) event);
+                // break;
+                // case LogEvent.XID_EVENT:
+                // parseXidEvent((XidLogEvent) event);
+                // break;
+                // default:
+                // break;
+                // }
             }
         } catch (Exception e) {
             e.printStackTrace();

+ 2 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.parse.driver.mysql.socket;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -24,7 +25,7 @@ public class BioSocketChannel implements SocketChannel {
 
     BioSocketChannel(Socket socket) throws IOException{
         this.socket = socket;
-        this.input = socket.getInputStream();
+        this.input = new BufferedInputStream(socket.getInputStream(), 16384);
         this.output = socket.getOutputStream();
     }
 
@@ -164,5 +165,4 @@ public class BioSocketChannel implements SocketChannel {
         this.socket = null;
     }
 
-
 }

+ 2 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannelPool.java

@@ -16,6 +16,8 @@ public abstract class BioSocketChannelPool {
         socket.setKeepAlive(true);
         socket.setReuseAddress(true);
         socket.connect(address, BioSocketChannel.DEFAULT_CONNECT_TIMEOUT);
+        System.out.println(socket.getReceiveBufferSize());
+        System.out.println(socket.getSendBufferSize());
         return new BioSocketChannel(socket);
     }
 

+ 3 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlDumpTest.java

@@ -41,6 +41,9 @@ public class MysqlDumpTest {
         controller.setTsdbSpringXml("classpath:tsdb/h2-tsdb.xml");
         controller.setEventFilter(new AviaterRegexFilter("test\\..*"));
         controller.setEventBlackFilter(new AviaterRegexFilter("canal_tsdb\\..*"));
+        controller.setParallel(true);
+        controller.setParallelBufferSize(4096);
+        controller.setParallelThreadSize(16);
         controller.setIsGTIDMode(false);
         controller.setEventSink(new AbstractCanalEventSinkTest<List<Entry>>() {