Jelajahi Sumber

fixed issue #726 , 多线程解析binlog DML事件,提升性能

七锋 6 tahun lalu
induk
melakukan
da8bf59d5b
24 mengubah file dengan 879 tambahan dan 106 penghapusan
  1. 0 1
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogContext.java
  2. 2 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java
  3. 13 0
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/LogHeader.java
  4. 1 1
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java
  5. 5 0
      deployer/src/main/resources/canal.properties
  6. 5 0
      deployer/src/main/resources/spring/default-instance.xml
  7. 5 0
      deployer/src/main/resources/spring/file-instance.xml
  8. 10 0
      deployer/src/main/resources/spring/group-instance.xml
  9. 5 0
      deployer/src/main/resources/spring/local-instance.xml
  10. 5 0
      deployer/src/main/resources/spring/memory-instance.xml
  11. 32 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java
  12. 7 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannel.java
  13. 2 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java
  14. 4 0
      parse/pom.xml
  15. 68 10
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  16. 8 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java
  17. 29 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/MultiStageCoprocessor.java
  18. 9 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java
  19. 124 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java
  20. 63 13
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  21. 391 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java
  22. 6 3
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java
  23. 80 74
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  24. 5 0
      pom.xml

+ 0 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogContext.java

@@ -58,7 +58,6 @@ public final class LogContext {
 
     public void reset() {
         formatDescription = FormatDescriptionLogEvent.FORMAT_DESCRIPTION_EVENT_5_x;
-
         mapOfTable.clear();
     }
 }

+ 2 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -109,6 +109,8 @@ public final class LogDecoder {
                         /* Decoding binary-log to event */
                         event = decode(buffer, header, context);
                         if (event != null) {
+                            // set logFileName
+                            event.getHeader().setLogFileName(context.getLogPosition().getFileName());
                             event.setSemival(buffer.semival);
                         }
                     } catch (IOException e) {

+ 13 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/LogHeader.java

@@ -115,6 +115,11 @@ public final class LogHeader {
      */
     protected long      crc;        // ha_checksum
 
+    /**
+     * binlog fileName
+     */
+    protected String    logFileName;
+
     /* for Start_event_v3 */
     public LogHeader(final int type){
         this.type = type;
@@ -270,6 +275,14 @@ public final class LogHeader {
         return checksumAlg;
     }
 
+    public String getLogFileName() {
+        return logFileName;
+    }
+
+    public void setLogFileName(String logFileName) {
+        this.logFileName = logFileName;
+    }
+
     private void processCheckSum(LogBuffer buffer) {
         if (checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_OFF && checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_UNDEF) {
             crc = buffer.getUint32(eventLen - LogEvent.BINLOG_CHECKSUM_LEN);

+ 1 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java

@@ -194,7 +194,7 @@ public abstract class RowsLogEvent extends LogEvent {
     }
 
     public final RowsLogBuffer getRowsBuf(String charsetName) {
-        return new RowsLogBuffer(rowsBuf.duplicate(), columnLen, charsetName);
+        return new RowsLogBuffer(rowsBuf, columnLen, charsetName);
     }
 
     public final int getFlags(final int flags) {

+ 5 - 0
deployer/src/main/resources/canal.properties

@@ -51,6 +51,11 @@ canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
 # binlog ddl isolation
 canal.instance.get.ddl.isolation = false
 
+# parallel parser config
+canal.instance.parser.parallel = true
+canal.instance.parser.parallelThreadSize = 16
+canal.instance.parser.parallelBufferSize = 256
+
 #################################################
 ######### 		destinations		############# 
 #################################################

+ 5 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -192,5 +192,10 @@
 		
 		<!--是否启用GTID模式-->
 		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>
+		
+		<!-- parallel parser -->
+		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
+		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
+		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
 </beans>

+ 5 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -177,5 +177,10 @@
 
 		<!--是否启用GTID模式-->
 		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>
+		
+		<!-- parallel parser -->
+		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
+		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
+		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
 </beans>

+ 10 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -166,6 +166,11 @@
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
 		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
+		
+		<!-- parallel parser -->
+		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
+		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
+		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
 	
 	<bean id="eventParser2" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
@@ -260,5 +265,10 @@
 		<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
 		<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
 		<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
+		
+		<!-- parallel parser -->
+		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
+		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
+		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
 </beans>

+ 5 - 0
deployer/src/main/resources/spring/local-instance.xml

@@ -137,5 +137,10 @@
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
+		
+		<!-- parallel parser -->
+		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
+		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
+		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
 </beans>

+ 5 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -165,5 +165,10 @@
 		
 		<!--是否启用GTID模式-->
 		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>
+		
+		<!-- parallel parser -->
+		<property name="parallel" value="${canal.instance.parser.parallel:true}" />
+		<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
+		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 	</bean>
 </beans>

+ 32 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java

@@ -93,6 +93,37 @@ public class BioSocketChannel implements SocketChannel {
         return data;
     }
 
+    @Override
+    public void read(byte[] data, int off, int len, int timeout) throws IOException {
+        InputStream input = this.input;
+        int accTimeout = 0;
+        if (input == null) {
+            throw new SocketException("Socket already closed.");
+        }
+
+        int n = 0;
+        while (n < len && accTimeout < timeout) {
+            try {
+                int read = input.read(data, off + n, len - n);
+                if (read > -1) {
+                    n += read;
+                } else {
+                    throw new IOException("EOF encountered.");
+                }
+            } catch (SocketTimeoutException te) {
+                if (Thread.interrupted()) {
+                    throw new ClosedByInterruptException();
+                }
+                accTimeout += SO_TIMEOUT;
+            }
+        }
+
+        if (n < len && accTimeout >= timeout) {
+            throw new SocketTimeoutException("Timeout occurred, failed to read " + len + " bytes in " + timeout
+                                             + " milliseconds.");
+        }
+    }
+
     public boolean isConnected() {
         Socket socket = this.socket;
         if (socket != null) {
@@ -133,4 +164,5 @@ public class BioSocketChannel implements SocketChannel {
         this.socket = null;
     }
 
+
 }

+ 7 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannel.java

@@ -11,6 +11,7 @@ import io.netty.util.internal.SystemPropertyUtil;
 import java.io.IOException;
 import java.net.SocketAddress;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -202,6 +203,11 @@ public class NettySocketChannel implements SocketChannel {
         } while (true);
     }
 
+    @Override
+    public void read(byte[] data, int off, int len, int timeout) throws IOException {
+        throw new NotImplementedException();
+    }
+
     public boolean isConnected() {
         return channel != null ? true : false;
     }
@@ -224,4 +230,5 @@ public class NettySocketChannel implements SocketChannel {
         }
     }
 
+
 }

+ 2 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java

@@ -15,6 +15,8 @@ public interface SocketChannel {
 
     public byte[] read(int readSize, int timeout) throws IOException;
 
+    public void read(byte[] data, int off, int len, int timeout) throws IOException;
+
     public boolean isConnected();
 
     public SocketAddress getRemoteSocketAddress();

+ 4 - 0
parse/pom.xml

@@ -73,6 +73,10 @@
 			<groupId>org.apache.commons</groupId>
 			<artifactId>commons-compress</artifactId>
 		</dependency>
+		<dependency>
+		    <groupId>com.lmax</groupId>
+		    <artifactId>disruptor</artifactId>
+		</dependency>
 		<!-- test dependency -->
 		<dependency>
 			<groupId>junit</groupId>

+ 68 - 10
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -89,11 +89,18 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
     protected Throwable                              exception                  = null;
 
     protected boolean                                isGTIDMode                 = false;                                   // 是否是GTID模式
+    protected boolean                                parallel                   = true;                                    // 是否开启并行解析模式
+    protected int                                    parallelThreadSize         = Runtime.getRuntime()
+                                                                                    .availableProcessors() * 60 / 100;     // 60%的能力跑解析,剩余部分处理网络
+    protected int                                    parallelBufferSize         = 16 * parallelThreadSize;
+    protected MultiStageCoprocessor                  multiStageCoprocessor;
 
     protected abstract BinlogParser buildParser();
 
     protected abstract ErosaConnection buildErosaConnection();
 
+    protected abstract MultiStageCoprocessor buildMultiStageCoprocessor();
+
     protected abstract EntryPosition findStartPosition(ErosaConnection connection) throws IOException;
 
     protected void preDump(ErosaConnection connection) {
@@ -217,20 +224,39 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         };
 
                         // 4. 开始dump数据
-                        // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
-                        if (isGTIDMode()) {
-                            erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler);
+                        if (parallel) {
+                            // build stage processor
+                            multiStageCoprocessor = buildMultiStageCoprocessor();
+                            multiStageCoprocessor.start();
+
+                            if (isGTIDMode()) {
+                                // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
+                                erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), multiStageCoprocessor);
+                            } else {
+                                if (StringUtils.isEmpty(startPosition.getJournalName())
+                                    && startPosition.getTimestamp() != null) {
+                                    erosaConnection.dump(startPosition.getTimestamp(), multiStageCoprocessor);
+                                } else {
+                                    erosaConnection.dump(startPosition.getJournalName(),
+                                        startPosition.getPosition(),
+                                        multiStageCoprocessor);
+                                }
+                            }
                         } else {
-                            if (StringUtils.isEmpty(startPosition.getJournalName())
-                                && startPosition.getTimestamp() != null) {
-                                erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
+                            if (isGTIDMode()) {
+                                // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
+                                erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler);
                             } else {
-                                erosaConnection.dump(startPosition.getJournalName(),
-                                    startPosition.getPosition(),
-                                    sinkHandler);
+                                if (StringUtils.isEmpty(startPosition.getJournalName())
+                                    && startPosition.getTimestamp() != null) {
+                                    erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
+                                } else {
+                                    erosaConnection.dump(startPosition.getJournalName(),
+                                        startPosition.getPosition(),
+                                        sinkHandler);
+                                }
                             }
                         }
-
                     } catch (TableIdNotFoundException e) {
                         exception = e;
                         // 特殊处理TableIdNotFound异常,出现这样的异常,一种可能就是起始的position是一个事务当中,导致tablemap
@@ -276,6 +302,9 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                     eventSink.interrupt();
                     transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据
                     binlogParser.reset();// 重新置位
+                    if (multiStageCoprocessor != null) {
+                        multiStageCoprocessor.reset();
+                    }
 
                     if (running) {
                         // sleep一段时间再进行重试
@@ -314,6 +343,10 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         if (transactionBuffer.isStart()) {
             transactionBuffer.stop();
         }
+
+        if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) {
+            multiStageCoprocessor.stop();
+        }
     }
 
     protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException,
@@ -549,4 +582,29 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
     public void setIsGTIDMode(boolean isGTIDMode) {
         this.isGTIDMode = isGTIDMode;
     }
+
+    public boolean isParallel() {
+        return parallel;
+    }
+
+    public void setParallel(boolean parallel) {
+        this.parallel = parallel;
+    }
+
+    public int getParallelThreadSize() {
+        return parallelThreadSize;
+    }
+
+    public void setParallelThreadSize(int parallelThreadSize) {
+        this.parallelThreadSize = parallelThreadSize;
+    }
+
+    public int getParallelBufferSize() {
+        return parallelBufferSize;
+    }
+
+    public void setParallelBufferSize(int parallelBufferSize) {
+        this.parallelBufferSize = parallelBufferSize;
+    }
+
 }

+ 8 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java

@@ -28,12 +28,16 @@ public interface ErosaConnection {
 
     /**
      * 通过GTID同步binlog
-     *
-     * @param gtidSet
-     * @param func
-     * @throws IOException
      */
     public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException;
 
+    // -------------
+
+    public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException;
+
+    public void dump(long timestamp, MultiStageCoprocessor coprocessor) throws IOException;
+
+    public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException;
+
     ErosaConnection fork();
 }

+ 29 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/MultiStageCoprocessor.java

@@ -0,0 +1,29 @@
+package com.alibaba.otter.canal.parse.inbound;
+
+import com.alibaba.otter.canal.common.CanalLifeCycle;
+import com.taobao.tddl.dbsync.binlog.LogBuffer;
+
+/**
+ * 针对解析器提供一个多阶段协同的处理
+ * 
+ * <pre>
+ * 1. 网络接收 (单线程)
+ * 2. 事件基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息)
+ * 3. 事件深度解析 (多线程, DML事件数据的完整解析)
+ * 4. 投递到store (单线程)
+ * </pre>
+ * 
+ * @author agapple 2018年7月3日 下午4:54:17
+ * @since 1.0.26
+ */
+public interface MultiStageCoprocessor extends CanalLifeCycle {
+
+    /**
+     * 网络数据投递
+     */
+    public void publish(LogBuffer buffer);
+
+    public void publish(LogBuffer buffer, String binlogFileName);
+
+    public void reset();
+}

+ 9 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -11,6 +11,7 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
+import com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBBuilder;
@@ -117,6 +118,14 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         }
     }
 
+    protected MultiStageCoprocessor buildMultiStageCoprocessor() {
+        return new MysqlMultiStageCoprocessor(parallelBufferSize,
+            parallelThreadSize,
+            (LogEventConvert) binlogParser,
+            transactionBuffer,
+            destination);
+    }
+
     // ============================ setter / getter =========================
 
     public void setConnectionCharsetNumber(byte connectionCharsetNumber) {

+ 124 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -11,9 +11,11 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
+import com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor;
 import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.mysql.local.BinLogFileQueue;
 import com.taobao.tddl.dbsync.binlog.FileLogFetcher;
+import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
@@ -207,6 +209,128 @@ public class LocalBinLogConnection implements ErosaConnection {
         throw new NotImplementedException();
     }
 
+    @Override
+    public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException {
+        File current = new File(directory, binlogfilename);
+
+        FileLogFetcher fetcher = new FileLogFetcher(bufferSize);
+        try {
+            fetcher.open(current, binlogPosition);
+            while (running) {
+                boolean needContinue = true;
+                while (fetcher.fetch()) {
+                    LogBuffer buffer = fetcher.duplicate();
+                    fetcher.consume(fetcher.limit());
+                    coprocessor.publish(buffer, binlogfilename); // set filename
+                }
+
+                if (needContinue) {// 读取下一个
+                    fetcher.close(); // 关闭上一个文件
+
+                    File nextFile;
+                    if (needWait) {
+                        nextFile = binlogs.waitForNextFile(current);
+                    } else {
+                        nextFile = binlogs.getNextFile(current);
+                    }
+
+                    if (nextFile == null) {
+                        break;
+                    }
+
+                    current = nextFile;
+                    fetcher.open(current);
+                    binlogfilename = nextFile.getName();
+                } else {
+                    break;// 跳出
+                }
+            }
+        } catch (InterruptedException e) {
+            logger.warn("LocalBinLogConnection dump interrupted");
+        } finally {
+            if (fetcher != null) {
+                fetcher.close();
+            }
+        }
+    }
+
+    @Override
+    public void dump(long timestampMills, MultiStageCoprocessor coprocessor) throws IOException {
+        List<File> currentBinlogs = binlogs.currentBinlogs();
+        File current = currentBinlogs.get(currentBinlogs.size() - 1);
+        long timestampSeconds = timestampMills / 1000;
+
+        String binlogFilename = null;
+        long binlogFileOffset = 0;
+
+        FileLogFetcher fetcher = new FileLogFetcher(bufferSize);
+        LogDecoder decoder = new LogDecoder();
+        decoder.handle(LogEvent.FORMAT_DESCRIPTION_EVENT);
+        decoder.handle(LogEvent.QUERY_EVENT);
+        decoder.handle(LogEvent.XID_EVENT);
+        LogContext context = new LogContext();
+        try {
+            fetcher.open(current);
+            context.setLogPosition(new LogPosition(current.getName()));
+            while (running) {
+                boolean needContinue = true;
+                String lastXidLogFilename = current.getName();
+                long lastXidLogFileOffset = 0;
+
+                binlogFilename = lastXidLogFilename;
+                binlogFileOffset = lastXidLogFileOffset;
+                while (fetcher.fetch()) {
+                    LogEvent event = decoder.decode(fetcher, context);
+                    if (event != null) {
+                        if (event.getWhen() > timestampSeconds) {
+                            break;
+                        }
+
+                        needContinue = false;
+                        if (LogEvent.QUERY_EVENT == event.getHeader().getType()) {
+                            if (StringUtils.endsWithIgnoreCase(((QueryLogEvent) event).getQuery(), "BEGIN")) {
+                                binlogFilename = lastXidLogFilename;
+                                binlogFileOffset = lastXidLogFileOffset;
+                            } else if (StringUtils.endsWithIgnoreCase(((QueryLogEvent) event).getQuery(), "COMMIT")) {
+                                lastXidLogFilename = current.getName();
+                                lastXidLogFileOffset = event.getLogPos();
+                            }
+                        } else if (LogEvent.XID_EVENT == event.getHeader().getType()) {
+                            lastXidLogFilename = current.getName();
+                            lastXidLogFileOffset = event.getLogPos();
+                        }
+                    }
+                }
+
+                if (needContinue) {// 读取下一个
+                    fetcher.close(); // 关闭上一个文件
+
+                    File nextFile = binlogs.getBefore(current);
+                    if (nextFile == null) {
+                        break;
+                    }
+
+                    current = nextFile;
+                    fetcher.open(current);
+                    context.setLogPosition(new LogPosition(current.getName()));
+                } else {
+                    break;// 跳出
+                }
+            }
+        } finally {
+            if (fetcher != null) {
+                fetcher.close();
+            }
+        }
+
+        dump(binlogFilename, binlogFileOffset, coprocessor);
+    }
+
+    @Override
+    public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException {
+        throw new NotImplementedException();
+    }
+
     public ErosaConnection fork() {
         LocalBinLogConnection connection = new LocalBinLogConnection();
 

+ 63 - 13
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -26,9 +26,11 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
+import com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor;
 import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
+import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
@@ -156,7 +158,7 @@ public class MysqlConnection implements ErosaConnection {
             }
 
             if (event.getSemival() == 1) {
-                sendSemiAck(context.getLogPosition().getFileName(), binlogPosition);
+                sendSemiAck(context.getLogPosition().getFileName(), context.getLogPosition().getPosition());
             }
         }
     }
@@ -167,27 +169,74 @@ public class MysqlConnection implements ErosaConnection {
         sendBinlogDumpGTID(gtidSet);
 
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
-        fetcher.start(connector.getChannel());
-        LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
-        LogContext context = new LogContext();
-        while (fetcher.fetch()) {
-            LogEvent event = null;
-            event = decoder.decode(fetcher, context);
+        try {
+            fetcher.start(connector.getChannel());
+            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
+            LogContext context = new LogContext();
+            while (fetcher.fetch()) {
+                LogEvent event = null;
+                event = decoder.decode(fetcher, context);
+
+                if (event == null) {
+                    throw new CanalParseException("parse failed");
+                }
 
-            if (event == null) {
-                throw new CanalParseException("parse failed");
+                if (!func.sink(event)) {
+                    break;
+                }
             }
+        } finally {
+            fetcher.close();
+        }
+    }
 
-            if (!func.sink(event)) {
-                break;
+    public void dump(long timestamp, SinkFunction func) throws IOException {
+        throw new NullPointerException("Not implement yet");
+    }
+
+    @Override
+    public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException {
+        updateSettings();
+        sendRegisterSlave();
+        sendBinlogDump(binlogfilename, binlogPosition);
+        ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
+        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
+        try {
+            fetcher.start(connector.getChannel());
+            while (fetcher.fetch()) {
+                LogBuffer buffer = fetcher.duplicate();
+                fetcher.consume(fetcher.limit());
+                coprocessor.publish(buffer);
             }
+        } finally {
+            fetcher.close();
         }
     }
 
-    public void dump(long timestamp, SinkFunction func) throws IOException {
+    @Override
+    public void dump(long timestamp, MultiStageCoprocessor coprocessor) throws IOException {
         throw new NullPointerException("Not implement yet");
     }
 
+    @Override
+    public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException {
+        updateSettings();
+        sendBinlogDumpGTID(gtidSet);
+
+        ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
+        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
+        try {
+            fetcher.start(connector.getChannel());
+            while (fetcher.fetch()) {
+                LogBuffer buffer = fetcher.duplicate();
+                fetcher.consume(fetcher.limit());
+                coprocessor.publish(buffer);
+            }
+        } finally {
+            fetcher.close();
+        }
+    }
+
     private void sendRegisterSlave() throws IOException {
         RegisterSlaveCommandPacket cmd = new RegisterSlaveCommandPacket();
         cmd.reportHost = authInfo.getAddress().getAddress().getHostAddress();
@@ -233,7 +282,7 @@ public class MysqlConnection implements ErosaConnection {
         connector.setDumping(true);
     }
 
-    private void sendSemiAck(String binlogfilename, Long binlogPosition) throws IOException {
+    public void sendSemiAck(String binlogfilename, Long binlogPosition) throws IOException {
         SemiAckCommandPacket semiAckCmd = new SemiAckCommandPacket();
         semiAckCmd.binlogFileName = binlogfilename;
         semiAckCmd.binlogPosition = binlogPosition;
@@ -538,4 +587,5 @@ public class MysqlConnection implements ErosaConnection {
     public void setAuthInfo(AuthenticationInfo authInfo) {
         this.authInfo = authInfo;
     }
+
 }

+ 391 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -0,0 +1,391 @@
+package com.alibaba.otter.canal.parse.inbound.mysql;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
+import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
+import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer;
+import com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor;
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.lmax.disruptor.BatchEventProcessor;
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.FatalExceptionHandler;
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.Sequence;
+import com.lmax.disruptor.SequenceBarrier;
+import com.lmax.disruptor.WorkHandler;
+import com.lmax.disruptor.WorkerPool;
+import com.taobao.tddl.dbsync.binlog.LogBuffer;
+import com.taobao.tddl.dbsync.binlog.LogContext;
+import com.taobao.tddl.dbsync.binlog.LogDecoder;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
+import com.taobao.tddl.dbsync.binlog.LogPosition;
+import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
+
+/**
+ * 针对解析器提供一个多阶段协同的处理
+ * 
+ * <pre>
+ * 1. 网络接收 (单线程)
+ * 2. 事件基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息)
+ * 3. 事件深度解析 (多线程, DML事件数据的完整解析)
+ * 4. 投递到store (单线程)
+ * </pre>
+ * 
+ * @author agapple 2018年7月3日 下午4:54:17
+ * @since 1.0.26
+ */
+public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implements MultiStageCoprocessor {
+
+    private LogEventConvert          logEventConvert;
+    private EventTransactionBuffer   transactionBuffer;
+    private ErosaConnection          connection;
+
+    private int                      parserThreadCount;
+    private int                      ringBufferSize;
+    private RingBuffer<MessageEvent> disruptorMsgBuffer;
+    private ExecutorService          parserExecutor;
+    private ExecutorService          stageExecutor;
+    private String                   destination;
+    private CanalParseException      exception;
+
+    public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
+                                      EventTransactionBuffer transactionBuffer, String destination){
+        this.ringBufferSize = ringBufferSize;
+        this.parserThreadCount = parserThreadCount;
+        this.logEventConvert = logEventConvert;
+        this.transactionBuffer = transactionBuffer;
+        this.destination = destination;
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        this.exception = null;
+        this.disruptorMsgBuffer = RingBuffer.createSingleProducer(new MessageEventFactory(),
+            ringBufferSize,
+            new BlockingWaitStrategy());
+
+        this.parserExecutor = Executors.newFixedThreadPool(parserThreadCount,
+            new NamedThreadFactory("MultiStageCoprocessor-Parser-" + destination));
+
+        this.stageExecutor = Executors.newFixedThreadPool(2, new NamedThreadFactory("MultiStageCoprocessor-other-"
+                                                                                    + destination));
+        SequenceBarrier sequenceBarrier = disruptorMsgBuffer.newBarrier();
+
+        // stage 2
+        BatchEventProcessor<MessageEvent> simpleParserStage = new BatchEventProcessor<MessageEvent>(disruptorMsgBuffer,
+            sequenceBarrier,
+            new SimpleParserStage());
+        disruptorMsgBuffer.addGatingSequences(simpleParserStage.getSequence());
+
+        // stage 3
+        SequenceBarrier dmlParserSequenceBarrier = disruptorMsgBuffer.newBarrier(simpleParserStage.getSequence());
+        WorkHandler<MessageEvent>[] workHandlers = new DmlParserStage[parserThreadCount];
+        for (int i = 0; i < parserThreadCount; i++) {
+            workHandlers[i] = new DmlParserStage();
+        }
+        WorkerPool<MessageEvent> workerPool = new WorkerPool<MessageEvent>(disruptorMsgBuffer,
+            dmlParserSequenceBarrier,
+            new FatalExceptionHandler(),
+            workHandlers);
+        Sequence[] sequence = workerPool.getWorkerSequences();
+        disruptorMsgBuffer.addGatingSequences(sequence);
+
+        // stage 4
+        SequenceBarrier sinkSequenceBarrier = disruptorMsgBuffer.newBarrier(sequence);
+        BatchEventProcessor<MessageEvent> sinkStoreStage = new BatchEventProcessor<MessageEvent>(disruptorMsgBuffer,
+            sinkSequenceBarrier,
+            new SinkStoreStage());
+        disruptorMsgBuffer.addGatingSequences(sinkStoreStage.getSequence());
+
+        // start work
+        stageExecutor.submit(simpleParserStage);
+        stageExecutor.submit(sinkStoreStage);
+        workerPool.start(parserExecutor);
+    }
+
+    @Override
+    public void stop() {
+        try {
+            parserExecutor.shutdownNow();
+        } catch (Throwable e) {
+            // ignore
+        }
+
+        try {
+            stageExecutor.shutdownNow();
+        } catch (Throwable e) {
+            // ignore
+        }
+        disruptorMsgBuffer = null;
+        super.stop();
+    }
+
+    /**
+     * 网络数据投递
+     */
+    public void publish(LogBuffer buffer) {
+        if (!isStart() && exception != null) {
+            throw exception;
+        }
+        long next = disruptorMsgBuffer.next();
+        MessageEvent event = disruptorMsgBuffer.get(next);
+        event.setBuffer(buffer);
+        disruptorMsgBuffer.publish(next);
+    }
+
+    public void publish(LogBuffer buffer, String binlogFileName) {
+        if (!isStart() && exception != null) {
+            throw exception;
+        }
+        long next = disruptorMsgBuffer.next();
+        MessageEvent event = disruptorMsgBuffer.get(next);
+        event.setBuffer(buffer);
+        event.setBinlogFileName(binlogFileName);
+        disruptorMsgBuffer.publish(next);
+    }
+
+    @Override
+    public void reset() {
+        stop();
+        start();
+    }
+
+    private class SimpleParserStage implements EventHandler<MessageEvent>, LifecycleAware {
+
+        private LogDecoder decoder;
+        private LogContext context;
+
+        public SimpleParserStage(){
+            decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
+            context = new LogContext();
+        }
+
+        public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throws Exception {
+            try {
+                LogBuffer buffer = event.getBuffer();
+                if (StringUtils.isNotEmpty(event.getBinlogFileName())
+                    && !context.getLogPosition().getFileName().equals(event.getBinlogFileName())) {
+                    // set roate binlog file name
+                    context.setLogPosition(new LogPosition(event.getBinlogFileName(), context.getLogPosition()
+                        .getPosition()));
+                }
+
+                LogEvent logEvent = decoder.decode(buffer, context);
+                event.setEvent(logEvent);
+
+                int eventType = logEvent.getHeader().getType();
+                TableMeta tableMeta = null;
+                boolean needDmlParse = false;
+                switch (eventType) {
+                    case LogEvent.WRITE_ROWS_EVENT_V1:
+                    case LogEvent.WRITE_ROWS_EVENT:
+                        tableMeta = logEventConvert.parseRowsEventForTableMeta((WriteRowsLogEvent) logEvent);
+                        needDmlParse = true;
+                        break;
+                    case LogEvent.UPDATE_ROWS_EVENT_V1:
+                    case LogEvent.UPDATE_ROWS_EVENT:
+                        tableMeta = logEventConvert.parseRowsEventForTableMeta((UpdateRowsLogEvent) logEvent);
+                        needDmlParse = true;
+                        break;
+                    case LogEvent.DELETE_ROWS_EVENT_V1:
+                    case LogEvent.DELETE_ROWS_EVENT:
+                        tableMeta = logEventConvert.parseRowsEventForTableMeta((DeleteRowsLogEvent) logEvent);
+                        needDmlParse = true;
+                        break;
+                    case LogEvent.ROWS_QUERY_LOG_EVENT:
+                        needDmlParse = true;
+                        break;
+                    default:
+                        CanalEntry.Entry entry = logEventConvert.parse(event.getEvent(), false);
+                        event.setEntry(entry);
+                }
+
+                // 记录一下DML的表结构
+                event.setNeedDmlParse(needDmlParse);
+                event.setTable(tableMeta);
+            } catch (Throwable e) {
+                exception = new CanalParseException(e);
+                throw exception;
+            }
+        }
+
+        @Override
+        public void onStart() {
+
+        }
+
+        @Override
+        public void onShutdown() {
+            stop();
+        }
+    }
+
+    private class DmlParserStage implements WorkHandler<MessageEvent>, LifecycleAware {
+
+        @Override
+        public void onEvent(MessageEvent event) throws Exception {
+            try {
+                if (event.isNeedDmlParse()) {
+                    int eventType = event.getEvent().getHeader().getType();
+                    CanalEntry.Entry entry = null;
+                    switch (eventType) {
+                        case LogEvent.ROWS_QUERY_LOG_EVENT:
+                            entry = logEventConvert.parse(event.getEvent(), false);
+                            break;
+                        default:
+                            // 单独解析dml事件
+                            entry = logEventConvert.parseRowsEvent((RowsLogEvent) event.getEvent(), event.getTable());
+                    }
+
+                    event.setEntry(entry);
+                }
+            } catch (Throwable e) {
+                exception = new CanalParseException(e);
+                throw exception;
+            }
+        }
+
+        @Override
+        public void onStart() {
+
+        }
+
+        @Override
+        public void onShutdown() {
+            stop();
+        }
+    }
+
+    private class SinkStoreStage implements EventHandler<MessageEvent>, LifecycleAware {
+
+        public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throws Exception {
+            try {
+                if (event.getEntry() != null) {
+                    transactionBuffer.add(event.getEntry());
+                }
+
+                LogEvent logEvent = event.getEvent();
+                if (connection instanceof MysqlConnection && logEvent.getSemival() == 1) {
+                    // semi ack回报
+                    ((MysqlConnection) connection).sendSemiAck(logEvent.getHeader().getLogFileName(),
+                        logEvent.getHeader().getLogPos());
+                }
+
+                // clear for gc
+                event.setBuffer(null);
+                event.setBinlogFileName(null);
+                event.setEvent(null);
+                event.setTable(null);
+                event.setEntry(null);
+                event.setNeedDmlParse(false);
+            } catch (Throwable e) {
+                exception = new CanalParseException(e);
+                throw exception;
+            }
+        }
+
+        @Override
+        public void onStart() {
+
+        }
+
+        @Override
+        public void onShutdown() {
+            stop();
+        }
+    }
+
+    class MessageEvent {
+
+        private String           binlogFileName;      // for local binlog parse
+        private LogBuffer        buffer;
+        private CanalEntry.Entry entry;
+        private boolean          needDmlParse = false;
+        private TableMeta        table;
+        private LogEvent         event;
+
+        public String getBinlogFileName() {
+            return binlogFileName;
+        }
+
+        public void setBinlogFileName(String binlogFileName) {
+            this.binlogFileName = binlogFileName;
+        }
+
+        public LogBuffer getBuffer() {
+            return buffer;
+        }
+
+        public void setBuffer(LogBuffer buffer) {
+            this.buffer = buffer;
+        }
+
+        public LogEvent getEvent() {
+            return event;
+        }
+
+        public void setEvent(LogEvent event) {
+            this.event = event;
+        }
+
+        public CanalEntry.Entry getEntry() {
+            return entry;
+        }
+
+        public void setEntry(CanalEntry.Entry entry) {
+            this.entry = entry;
+        }
+
+        public boolean isNeedDmlParse() {
+            return needDmlParse;
+        }
+
+        public void setNeedDmlParse(boolean needDmlParse) {
+            this.needDmlParse = needDmlParse;
+        }
+
+        public TableMeta getTable() {
+            return table;
+        }
+
+        public void setTable(TableMeta table) {
+            this.table = table;
+        }
+
+    }
+
+    class MessageEventFactory implements EventFactory<MessageEvent> {
+
+        public MessageEvent newInstance() {
+            return new MessageEvent();
+        }
+    }
+
+    public void setLogEventConvert(LogEventConvert logEventConvert) {
+        this.logEventConvert = logEventConvert;
+    }
+
+    public void setTransactionBuffer(EventTransactionBuffer transactionBuffer) {
+        this.transactionBuffer = transactionBuffer;
+    }
+
+    public void setConnection(ErosaConnection connection) {
+        this.connection = connection;
+    }
+
+}

+ 6 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

@@ -171,10 +171,13 @@ public class DirectLogFetcher extends LogFetcher {
     private final boolean fetch0(final int off, final int len) throws IOException {
         ensureCapacity(off + len);
 
-        byte[] read = channel.read(len, READ_TIMEOUT_MILLISECONDS);
-        System.arraycopy(read, 0, this.buffer, off, len);
+        // byte[] read = channel.read(len, READ_TIMEOUT_MILLISECONDS);
+        // System.arraycopy(read, 0, this.buffer, off, len);
 
-        if (limit < off + len) limit = off + len;
+        channel.read(buffer, off, len, READ_TIMEOUT_MILLISECONDS);
+        if (limit < off + len) {
+            limit = off + len;
+        }
         return true;
     }
 

+ 80 - 74
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -46,7 +46,6 @@ import com.taobao.tddl.dbsync.binlog.event.IntvarLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.LogHeader;
 import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.RandLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer;
 import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent;
@@ -89,7 +88,6 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     private volatile AviaterRegexFilter nameBlackFilter;
 
     private TableMetaCache              tableMetaCache;
-    private String                      binlogFileName      = "mysql-bin.000001";
     private Charset                     charset             = Charset.defaultCharset();
     private boolean                     filterQueryDcl      = false;
     private boolean                     filterQueryDml      = false;
@@ -119,9 +117,6 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
         int eventType = logEvent.getHeader().getType();
         switch (eventType) {
-            case LogEvent.ROTATE_EVENT:
-                binlogFileName = ((RotateLogEvent) logEvent).getFilename();
-                break;
             case LogEvent.QUERY_EVENT:
                 return parseQueryEvent((QueryLogEvent) logEvent, isSeek);
             case LogEvent.XID_EVENT:
@@ -158,7 +153,6 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
     public void reset() {
         // do nothing
-        binlogFileName = "mysql-bin.000001";
         if (tableMetaCache != null) {
             tableMetaCache.clearTableMeta();
         }
@@ -174,7 +168,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             gtidSet.update(value);
         }
 
-        Header header = createHeader("", logHeader, "", "", EventType.GTID);
+        Header header = createHeader(logHeader, "", "", EventType.GTID);
         return createEntry(header, EntryType.GTIDLOG, builder.build().toByteString());
     }
 
@@ -187,7 +181,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             beginBuilder.addProps(createSpecialPair(XA_TYPE, XA_START));
             beginBuilder.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_START)));
             TransactionBegin transactionBegin = beginBuilder.build();
-            Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
+            Header header = createHeader(event.getHeader(), "", "", null);
             return createEntry(header, EntryType.TRANSACTIONBEGIN, transactionBegin.toByteString());
         } else if (StringUtils.startsWithIgnoreCase(queryString, XA_END)) {
             // xa start use TransactionEnd
@@ -196,11 +190,11 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             endBuilder.addProps(createSpecialPair(XA_TYPE, XA_END));
             endBuilder.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_END)));
             TransactionEnd transactionEnd = endBuilder.build();
-            Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
+            Header header = createHeader(event.getHeader(), "", "", null);
             return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
         } else if (StringUtils.startsWithIgnoreCase(queryString, XA_COMMIT)) {
             // xa commit
-            Header header = createHeader(binlogFileName, event.getHeader(), "", "", EventType.XACOMMIT);
+            Header header = createHeader(event.getHeader(), "", "", EventType.XACOMMIT);
             RowChange.Builder rowChangeBuider = RowChange.newBuilder();
             rowChangeBuider.setSql(queryString);
             rowChangeBuider.addProps(createSpecialPair(XA_TYPE, XA_COMMIT));
@@ -209,7 +203,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
         } else if (StringUtils.startsWithIgnoreCase(queryString, XA_ROLLBACK)) {
             // xa rollback
-            Header header = createHeader(binlogFileName, event.getHeader(), "", "", EventType.XAROLLBACK);
+            Header header = createHeader(event.getHeader(), "", "", EventType.XAROLLBACK);
             RowChange.Builder rowChangeBuider = RowChange.newBuilder();
             rowChangeBuider.setSql(queryString);
             rowChangeBuider.addProps(createSpecialPair(XA_TYPE, XA_ROLLBACK));
@@ -218,11 +212,11 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
         } else if (StringUtils.endsWithIgnoreCase(queryString, BEGIN)) {
             TransactionBegin transactionBegin = createTransactionBegin(event.getSessionId());
-            Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
+            Header header = createHeader(event.getHeader(), "", "", null);
             return createEntry(header, EntryType.TRANSACTIONBEGIN, transactionBegin.toByteString());
         } else if (StringUtils.endsWithIgnoreCase(queryString, COMMIT)) {
             TransactionEnd transactionEnd = createTransactionEnd(0L); // MyISAM可能不会有xid事件
-            Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
+            Header header = createHeader(event.getHeader(), "", "", null);
             return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
         } else {
             boolean notFilter = false;
@@ -265,7 +259,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 tableMetaCache.apply(position, event.getDbName(), queryString, null);
             }
 
-            Header header = createHeader(binlogFileName, event.getHeader(), schemaName, tableName, type);
+            Header header = createHeader(event.getHeader(), schemaName, tableName, type);
             RowChange.Builder rowChangeBuider = RowChange.newBuilder();
             if (type != EventType.QUERY && type != EventType.INSERT && type != EventType.UPDATE
                 && type != EventType.DELETE) {
@@ -425,51 +419,77 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
     private Entry parseXidEvent(XidLogEvent event) {
         TransactionEnd transactionEnd = createTransactionEnd(event.getXid());
-        Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
+        Header header = createHeader(event.getHeader(), "", "", null);
         return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
     }
 
-    private Entry parseRowsEvent(RowsLogEvent event) {
-        if (filterRows) {
+    public TableMeta parseRowsEventForTableMeta(RowsLogEvent event) {
+        TableMapLogEvent table = event.getTable();
+        if (table == null) {
+            // tableId对应的记录不存在
+            throw new TableIdNotFoundException("not found tableId:" + event.getTableId());
+        }
+
+        boolean isHeartBeat = isAliSQLHeartBeat(table.getDbName(), table.getTableName());
+        boolean isRDSHeartBeat = tableMetaCache.isOnRDS() && isRDSHeartBeat(table.getDbName(), table.getTableName());
+
+        String fullname = table.getDbName() + "." + table.getTableName();
+        // check name filter
+        if (nameFilter != null && !nameFilter.filter(fullname)) {
+            return null;
+        }
+        if (nameBlackFilter != null && nameBlackFilter.filter(fullname)) {
             return null;
         }
-        try {
-            TableMapLogEvent table = event.getTable();
-            if (table == null) {
-                // tableId对应的记录不存在
-                throw new TableIdNotFoundException("not found tableId:" + event.getTableId());
-            }
 
-            boolean isHeartBeat = isAliSQLHeartBeat(table.getDbName(), table.getTableName());
-            boolean isRDSHeartBeat = tableMetaCache.isOnRDS()
-                                     && isRDSHeartBeat(table.getDbName(), table.getTableName());
+        // if (isHeartBeat || isRDSHeartBeat) {
+        // // 忽略rds模式的mysql.ha_health_check心跳数据
+        // return null;
+        // }
+        TableMeta tableMeta = null;
+        if (isRDSHeartBeat) {
+            // 处理rds模式的mysql.ha_health_check心跳数据
+            // 主要RDS的心跳表基本无权限,需要mock一个tableMeta
+            FieldMeta idMeta = new FieldMeta("id", "bigint(20)", true, false, "0");
+            FieldMeta typeMeta = new FieldMeta("type", "char(1)", false, true, "0");
+            tableMeta = new TableMeta(table.getDbName(), table.getTableName(), Arrays.asList(idMeta, typeMeta));
+        } else if (isHeartBeat) {
+            // 处理alisql模式的test.heartbeat心跳数据
+            // 心跳表基本无权限,需要mock一个tableMeta
+            FieldMeta idMeta = new FieldMeta("id", "smallint(6)", false, true, null);
+            FieldMeta typeMeta = new FieldMeta("ts", "int(11)", true, false, null);
+            tableMeta = new TableMeta(table.getDbName(), table.getTableName(), Arrays.asList(idMeta, typeMeta));
+        }
 
-            String fullname = table.getDbName() + "." + table.getTableName();
-            // check name filter
-            if (nameFilter != null && !nameFilter.filter(fullname)) {
-                return null;
+        EntryPosition position = createPosition(event.getHeader());
+        if (tableMetaCache != null && tableMeta == null) {// 入错存在table meta
+            tableMeta = getTableMeta(table.getDbName(), table.getTableName(), true, position);
+            if (tableMeta == null) {
+                if (!filterTableError) {
+                    throw new CanalParseException("not found [" + fullname + "] in db , pls check!");
+                }
             }
-            if (nameBlackFilter != null && nameBlackFilter.filter(fullname)) {
-                return null;
+        }
+
+        return tableMeta;
+    }
+
+    public Entry parseRowsEvent(RowsLogEvent event) {
+        return parseRowsEvent(event, null);
+    }
+
+    public Entry parseRowsEvent(RowsLogEvent event, TableMeta tableMeta) {
+        if (filterRows) {
+            return null;
+        }
+        try {
+            if (tableMeta == null) { // 如果没有外部指定
+                tableMeta = parseRowsEventForTableMeta(event);
             }
 
-            // if (isHeartBeat || isRDSHeartBeat) {
-            // // 忽略rds模式的mysql.ha_health_check心跳数据
-            // return null;
-            // }
-            TableMeta tableMeta = null;
-            if (isRDSHeartBeat) {
-                // 处理rds模式的mysql.ha_health_check心跳数据
-                // 主要RDS的心跳表基本无权限,需要mock一个tableMeta
-                FieldMeta idMeta = new FieldMeta("id", "bigint(20)", true, false, "0");
-                FieldMeta typeMeta = new FieldMeta("type", "char(1)", false, true, "0");
-                tableMeta = new TableMeta(table.getDbName(), table.getTableName(), Arrays.asList(idMeta, typeMeta));
-            } else if (isHeartBeat) {
-                // 处理alisql模式的test.heartbeat心跳数据
-                // 心跳表基本无权限,需要mock一个tableMeta
-                FieldMeta idMeta = new FieldMeta("id", "smallint(6)", false, true, null);
-                FieldMeta typeMeta = new FieldMeta("ts", "int(11)", true, false, null);
-                tableMeta = new TableMeta(table.getDbName(), table.getTableName(), Arrays.asList(idMeta, typeMeta));
+            if (tableMeta == null) {
+                // 拿不到表结构,执行忽略
+                return null;
             }
 
             EventType eventType = null;
@@ -484,12 +504,9 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 throw new CanalParseException("unsupport event type :" + event.getHeader().getType());
             }
 
-            Header header = createHeader(binlogFileName,
-                event.getHeader(),
-                table.getDbName(),
-                table.getTableName(),
-                eventType);
-            EntryPosition position = createPosition(event.getHeader());
+            TableMapLogEvent table = event.getTable();
+            Header header = createHeader(event.getHeader(), table.getDbName(), table.getTableName(), eventType);
+
             RowChange.Builder rowChangeBuider = RowChange.newBuilder();
             rowChangeBuider.setTableId(event.getTableId());
             rowChangeBuider.setIsDdl(false);
@@ -498,18 +515,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             RowsLogBuffer buffer = event.getRowsBuf(charset.name());
             BitSet columns = event.getColumns();
             BitSet changeColumns = event.getChangeColumns();
-            boolean tableError = false;
-            if (tableMetaCache != null && tableMeta == null) {// 入错存在table meta
-                                                              // cache
-                tableMeta = getTableMeta(table.getDbName(), table.getTableName(), true, position);
-                if (tableMeta == null) {
-                    tableError = true;
-                    if (!filterTableError) {
-                        throw new CanalParseException("not found [" + fullname + "] in db , pls check!");
-                    }
-                }
-            }
 
+            boolean tableError = false;
             while (buffer.nextOneRow(columns)) {
                 // 处理row记录
                 RowData.Builder rowDataBuilder = RowData.newBuilder();
@@ -539,7 +546,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 logger.warn("table parser error : {}storeValue: {}", entry.toString(), rowChange.toString());
                 return null;
             } else {
-                Entry entry = createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
+                Entry entry = createEntry(header, EntryType.ROWDATA, rowChange.toByteString());
                 return entry;
             }
         } catch (Exception e) {
@@ -548,7 +555,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     }
 
     private EntryPosition createPosition(LogHeader logHeader) {
-        return new EntryPosition(binlogFileName,
+        return new EntryPosition(logHeader.getLogFileName(),
             logHeader.getLogPos(),
             logHeader.getWhen() * 1000L,
             logHeader.getServerId()); // 记录到秒
@@ -641,7 +648,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                     isBinary = true;
                 } else if (StringUtils.containsIgnoreCase(fieldMeta.getColumnType(), "BINARY")) {
                     isBinary = true;
-                } else if (StringUtils.containsIgnoreCase(fieldMeta.getColumnType(), "tinyint(1)")) {
+                } else if (StringUtils.containsIgnoreCase(fieldMeta.getColumnType(), "TINYINT(1)")) {
                     isSingleBit = true;
                 }
             }
@@ -771,7 +778,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     }
 
     private Entry buildQueryEntry(String queryString, LogHeader logHeader, String tableName) {
-        Header header = createHeader(binlogFileName, logHeader, "", tableName, EventType.QUERY);
+        Header header = createHeader(logHeader, "", tableName, EventType.QUERY);
         RowChange.Builder rowChangeBuider = RowChange.newBuilder();
         rowChangeBuider.setSql(queryString);
         rowChangeBuider.setEventType(EventType.QUERY);
@@ -779,19 +786,18 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     }
 
     private Entry buildQueryEntry(String queryString, LogHeader logHeader) {
-        Header header = createHeader(binlogFileName, logHeader, "", "", EventType.QUERY);
+        Header header = createHeader(logHeader, "", "", EventType.QUERY);
         RowChange.Builder rowChangeBuider = RowChange.newBuilder();
         rowChangeBuider.setSql(queryString);
         rowChangeBuider.setEventType(EventType.QUERY);
         return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
     }
 
-    private Header createHeader(String binlogFile, LogHeader logHeader, String schemaName, String tableName,
-                                EventType eventType) {
+    private Header createHeader(LogHeader logHeader, String schemaName, String tableName, EventType eventType) {
         // header会做信息冗余,方便以后做检索或者过滤
         Header.Builder headerBuilder = Header.newBuilder();
         headerBuilder.setVersion(version);
-        headerBuilder.setLogfileName(binlogFile);
+        headerBuilder.setLogfileName(logHeader.getLogFileName());
         headerBuilder.setLogfileOffset(logHeader.getLogPos() - logHeader.getEventLen());
         headerBuilder.setServerId(logHeader.getServerId());
         headerBuilder.setServerenCode(UTF_8);// 经过java输出后所有的编码为unicode

+ 5 - 0
pom.xml

@@ -263,6 +263,11 @@
                 <artifactId>druid</artifactId>
                 <version>1.1.9</version>
             </dependency>
+            <dependency>
+                <groupId>com.lmax</groupId>
+                <artifactId>disruptor</artifactId>
+                <version>3.4.2</version>
+            </dependency>
             <!-- log -->
             <dependency>
                 <groupId>ch.qos.logback</groupId>