Browse Source

Merge pull request #5 from alibaba/master

merge from alibaba/canal
rewerma 6 years ago
parent
commit
32910b5983
23 changed files with 99 additions and 82 deletions
  1. 1 1
      client-launcher/src/main/bin/startup.sh
  2. 2 2
      client-launcher/src/main/bin/stop.sh
  3. 4 13
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogContext.java
  4. 11 7
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java
  5. 10 5
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/GtidLogEvent.java
  6. 1 3
      dbsync/src/test/java/com/taobao/tddl/dbsync/FetcherPerformanceTest.java
  7. 1 1
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java
  8. 5 3
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java
  9. 1 3
      deployer/src/main/resources/kafka.yml
  10. 10 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java
  11. 4 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannel.java
  12. 2 0
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java
  13. 5 5
      parse/src/main/java/com/alibaba/otter/canal/parse/exception/PositionNotFoundException.java
  14. 6 6
      parse/src/main/java/com/alibaba/otter/canal/parse/exception/ServerIdNotMatchException.java
  15. 7 3
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  16. 1 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ParserExceptionHandler.java
  17. 0 8
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java
  18. 13 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  19. 10 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java
  20. 1 14
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  21. 2 2
      parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogDumpPerformanceTest.java
  22. 1 1
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/SimpleDdlParserTest.java
  23. 1 1
      store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

+ 1 - 1
client-launcher/src/main/bin/startup.sh

@@ -74,7 +74,7 @@ else
 fi
 
 JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
-CANAL_OPTS="-DappName=otter-canal -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf"
+CANAL_OPTS="-DappName=otter-canal-client -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf"
 
 if [ -e $canal_conf -a -e $logback_configurationFile ]
 then 

+ 2 - 2
client-launcher/src/main/bin/stop.sh

@@ -45,7 +45,7 @@ fi
 
 pid=`cat $pidfile`
 if [ "$pid" == "" ] ; then
-	pid=`get_pid "appName=otter-canal"`
+	pid=`get_pid "appName=otter-canal-client"`
 fi
 
 echo -e "`hostname`: stopping canal $pid ... "
@@ -54,7 +54,7 @@ kill $pid
 LOOPS=0
 while (true); 
 do 
-	gpid=`get_pid "appName=otter-canal" "$pid"`
+	gpid=`get_pid "appName=otter-canal-client" "$pid"`
     if [ "$gpid" == "" ] ; then
     	echo "Oook! cost:$LOOPS"
     	`rm $pidfile`

+ 4 - 13
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogContext.java

@@ -4,9 +4,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
-import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
 
 /**
@@ -66,18 +64,11 @@ public final class LogContext {
         mapOfTable.clear();
     }
 
-    public final void putGtid(GtidLogEvent logEvent) {
-        if (logEvent != null) {
-            String gtid = logEvent.getSid().toString() + ":" + logEvent.getGno();
-            if (gtidSet == null) {
-                gtid = logEvent.getSid().toString() + ":1-" + logEvent.getGno();
-                gtidSet = MysqlGTIDSet.parse(gtid);
-            }
-            gtidSet.update(gtid);
-        }
-    }
-
     public GTIDSet getGtidSet() {
         return gtidSet;
     }
+
+    public void setGtidSet(GTIDSet gtidSet) {
+        this.gtidSet = gtidSet;
+    }
 }

+ 11 - 7
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -3,10 +3,10 @@ package com.taobao.tddl.dbsync.binlog;
 import java.io.IOException;
 import java.util.BitSet;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.taobao.tddl.dbsync.binlog.event.AppendBlockLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.BeginLoadQueryLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.CreateFileLogEvent;
@@ -47,7 +47,7 @@ import com.taobao.tddl.dbsync.binlog.event.mariadb.StartEncryptionLogEvent;
 
 /**
  * Implements a binary-log decoder.
- * 
+ *
  * <pre>
  * LogDecoder decoder = new LogDecoder();
  * decoder.handle(...);
@@ -62,7 +62,7 @@ import com.taobao.tddl.dbsync.binlog.event.mariadb.StartEncryptionLogEvent;
  * while (event != null);
  * // no more events in buffer.
  * </pre>
- * 
+ *
  * @author <a href="mailto:changyuan.lh@taobao.com">Changyuan.lh</a>
  * @version 1.0
  */
@@ -89,7 +89,7 @@ public final class LogDecoder {
 
     /**
      * Decoding an event from binary-log buffer.
-     * 
+     *
      * @return <code>UknownLogEvent</code> if event type is unknown or skipped,
      * <code>null</code> if buffer is not including a full event.
      */
@@ -141,7 +141,7 @@ public final class LogDecoder {
 
     /**
      * Deserialize an event from buffer.
-     * 
+     *
      * @return <code>UknownLogEvent</code> if event type is unknown or skipped.
      */
     public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext context) throws IOException {
@@ -160,6 +160,7 @@ public final class LogDecoder {
             // remove checksum bytes
             buffer.limit(header.getEventLen() - LogEvent.BINLOG_CHECKSUM_LEN);
         }
+        GTIDSet gtidSet = context.getGtidSet();
         switch (header.getType()) {
             case LogEvent.QUERY_EVENT: {
                 QueryLogEvent event = new QueryLogEvent(header, buffer, descriptionEvent);
@@ -373,8 +374,11 @@ public final class LogDecoder {
                 GtidLogEvent event = new GtidLogEvent(header, buffer, descriptionEvent);
                 /* updating position in context */
                 logPosition.position = header.getLogPos();
-                // update latest gtid
-                context.putGtid(event);
+                if (gtidSet != null) {
+                    gtidSet.update(event.getGtidStr());
+                    // update latest gtid
+                    header.putGtidStr(gtidSet);
+                }
                 return event;
             }
             case LogEvent.PREVIOUS_GTIDS_LOG_EVENT: {

+ 10 - 5
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/GtidLogEvent.java

@@ -14,10 +14,10 @@ import com.taobao.tddl.dbsync.binlog.LogEvent;
 public class GtidLogEvent extends LogEvent {
 
     // / Length of the commit_flag in event encoding
-    public static final int ENCODED_FLAG_LENGTH = 1;
+    public static final int ENCODED_FLAG_LENGTH         = 1;
     // / Length of SID in event encoding
-    public static final int ENCODED_SID_LENGTH  = 16;
-    public static final int LOGICAL_TIMESTAMP_TYPE_CODE  = 2;
+    public static final int ENCODED_SID_LENGTH          = 16;
+    public static final int LOGICAL_TIMESTAMP_TYPE_CODE = 2;
 
     private boolean         commitFlag;
     private UUID            sid;
@@ -50,8 +50,6 @@ public class GtidLogEvent extends LogEvent {
             sequenceNumber = buffer.getLong64();
         }
 
-
-
         // ignore gtid info read
         // sid.copy_from((uchar *)ptr_buffer);
         // ptr_buffer+= ENCODED_SID_LENGTH;
@@ -82,4 +80,11 @@ public class GtidLogEvent extends LogEvent {
     public Long getSequenceNumber() {
         return sequenceNumber;
     }
+
+    public String getGtidStr() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(sid.toString()).append(":");
+        sb.append(gno);
+        return sb.toString();
+    }
 }

+ 1 - 3
dbsync/src/test/java/com/taobao/tddl/dbsync/FetcherPerformanceTest.java

@@ -15,9 +15,7 @@ public class FetcherPerformanceTest {
         DirectLogFetcher fetcher = new DirectLogFetcher();
         try {
             Class.forName("com.mysql.jdbc.Driver");
-            Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306",
-                "root",
-                "hello");
+            Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", "root", "hello");
             Statement statement = connection.createStatement();
             statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'");
             statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");

+ 1 - 1
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -387,7 +387,7 @@ public class CanalController {
         return config;
     }
 
-    private String getProperty(Properties properties, String key) {
+    public String getProperty(Properties properties, String key) {
         key = StringUtils.trim(key);
         String value = System.getProperty(key);
 

+ 5 - 3
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -3,12 +3,13 @@ package com.alibaba.otter.canal.deployer;
 import java.io.FileInputStream;
 import java.util.Properties;
 
-import com.alibaba.otter.canal.kafka.CanalKafkaStarter;
-import com.alibaba.otter.canal.server.CanalServerStarter;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.kafka.CanalKafkaStarter;
+import com.alibaba.otter.canal.server.CanalServerStarter;
+
 /**
  * canal独立版本启动的入口类
  * 
@@ -55,12 +56,13 @@ public class CanalLauncher {
             });
 
             CanalServerStarter canalServerStarter = null;
-            String serverMode = properties.getProperty(CanalConstants.CANAL_SERVER_MODE, "tcp");
+            String serverMode = controller.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
             if (serverMode.equalsIgnoreCase("kafka")) {
                 canalServerStarter = new CanalKafkaStarter();
             } else if (serverMode.equalsIgnoreCase("rocketMQ")) {
                 // 预留rocketMQ启动
             }
+
             if (canalServerStarter != null) {
                 canalServerStarter.init();
             }

+ 1 - 3
deployer/src/main/resources/kafka.yml

@@ -12,6 +12,4 @@ canalGetTimeout: 100
 canalDestinations:
   - canalDestination: example
     topic: example
-    partition:
-
-
+    partition:

+ 10 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java

@@ -138,6 +138,16 @@ public class BioSocketChannel implements SocketChannel {
         if (socket != null) {
             return socket.getRemoteSocketAddress();
         }
+
+        return null;
+    }
+
+    public SocketAddress getLocalSocketAddress() {
+        Socket socket = this.socket;
+        if (socket != null) {
+            return socket.getLocalSocketAddress();
+        }
+
         return null;
     }
 

+ 4 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannel.java

@@ -216,6 +216,10 @@ public class NettySocketChannel implements SocketChannel {
         return channel != null ? channel.remoteAddress() : null;
     }
 
+    public SocketAddress getLocalSocketAddress() {
+        return channel != null ? channel.localAddress() : null;
+    }
+
     public void close() {
         if (channel != null) {
             channel.close();

+ 2 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java

@@ -21,5 +21,7 @@ public interface SocketChannel {
 
     public SocketAddress getRemoteSocketAddress();
 
+    public SocketAddress getLocalSocketAddress();
+
     public void close();
 }

+ 5 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/exception/PositionNotFoundException.java

@@ -8,23 +8,23 @@ public class PositionNotFoundException extends CanalParseException {
 
     private static final long serialVersionUID = -7382448928116244017L;
 
-    public PositionNotFoundException(String errorCode) {
+    public PositionNotFoundException(String errorCode){
         super(errorCode);
     }
 
-    public PositionNotFoundException(String errorCode, Throwable cause) {
+    public PositionNotFoundException(String errorCode, Throwable cause){
         super(errorCode, cause);
     }
 
-    public PositionNotFoundException(String errorCode, String errorDesc) {
+    public PositionNotFoundException(String errorCode, String errorDesc){
         super(errorCode, errorDesc);
     }
 
-    public PositionNotFoundException(String errorCode, String errorDesc, Throwable cause) {
+    public PositionNotFoundException(String errorCode, String errorDesc, Throwable cause){
         super(errorCode, errorDesc, cause);
     }
 
-    public PositionNotFoundException(Throwable cause) {
+    public PositionNotFoundException(Throwable cause){
         super(cause);
     }
 }

+ 6 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/exception/ServerIdNotMatchException.java

@@ -6,27 +6,27 @@ import com.alibaba.otter.canal.common.CanalException;
  * @author chengjin.lyf on 2018/8/8 下午1:07
  * @since 1.0.25
  */
-public class ServerIdNotMatchException extends CanalException{
+public class ServerIdNotMatchException extends CanalException {
 
     private static final long serialVersionUID = -6124989280379293953L;
 
-    public ServerIdNotMatchException(String errorCode) {
+    public ServerIdNotMatchException(String errorCode){
         super(errorCode);
     }
 
-    public ServerIdNotMatchException(String errorCode, Throwable cause) {
+    public ServerIdNotMatchException(String errorCode, Throwable cause){
         super(errorCode, cause);
     }
 
-    public ServerIdNotMatchException(String errorCode, String errorDesc) {
+    public ServerIdNotMatchException(String errorCode, String errorDesc){
         super(errorCode, errorDesc);
     }
 
-    public ServerIdNotMatchException(String errorCode, String errorDesc, Throwable cause) {
+    public ServerIdNotMatchException(String errorCode, String errorDesc, Throwable cause){
         super(errorCode, errorDesc, cause);
     }
 
-    public ServerIdNotMatchException(Throwable cause) {
+    public ServerIdNotMatchException(Throwable cause){
         super(cause);
     }
 }

+ 7 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -19,11 +19,13 @@ import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.common.alarm.CanalAlarmHandler;
 import com.alibaba.otter.canal.filter.CanalEventFilter;
 import com.alibaba.otter.canal.parse.CanalEventParser;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
 import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
 import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer.TransactionFlushCallback;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor;
 import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry;
@@ -235,12 +237,14 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         if (parallel) {
                             // build stage processor
                             multiStageCoprocessor = buildMultiStageCoprocessor();
-                            multiStageCoprocessor.start();
-
                             if (isGTIDMode()) {
                                 // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
-                                erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), multiStageCoprocessor);
+                                GTIDSet gtidSet = MysqlGTIDSet.parse(startPosition.getGtid());
+                                ((MysqlMultiStageCoprocessor) multiStageCoprocessor).setGtidSet(gtidSet);
+                                multiStageCoprocessor.start();
+                                erosaConnection.dump(gtidSet, multiStageCoprocessor);
                             } else {
+                                multiStageCoprocessor.start();
                                 if (StringUtils.isEmpty(startPosition.getJournalName())
                                     && startPosition.getTimestamp() != null) {
                                     erosaConnection.dump(startPosition.getTimestamp(), multiStageCoprocessor);

+ 1 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ParserExceptionHandler.java

@@ -5,5 +5,6 @@ package com.alibaba.otter.canal.parse.inbound;
  * @since 1.0.25
  */
 public interface ParserExceptionHandler {
+
     void handle(Throwable e);
 }

+ 0 - 8
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -9,7 +9,6 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.otter.canal.filter.CanalEventFilter;
 import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
 import com.alibaba.otter.canal.parse.CanalEventParser;
-import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
@@ -76,13 +75,6 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
      * @return
      */
     protected boolean processTableMeta(EntryPosition position) {
-        if (isGTIDMode()) {
-            if (binlogParser instanceof LogEventConvert) {
-                // 记录gtid
-                ((LogEventConvert) binlogParser).setGtidSet(MysqlGTIDSet.parse(position.getGtid()));
-            }
-        }
-
         if (tableMetaTSDB != null) {
             if (position.getTimestamp() == null || position.getTimestamp() <= 0) {
                 throw new CanalParseException("use gtid and TableMeta TSDB should be config timestamp > 0");

+ 13 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -4,6 +4,7 @@ import static com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetche
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.nio.charset.Charset;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -179,6 +180,8 @@ public class MysqlConnection implements ErosaConnection {
             fetcher.start(connector.getChannel());
             LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             LogContext context = new LogContext();
+            // fix bug: #890 将gtid传输至context中,供decode使用
+            context.setGtidSet(gtidSet);
             while (fetcher.fetch()) {
                 accumulateReceivedBytes(fetcher.limit());
                 LogEvent event = null;
@@ -252,10 +255,18 @@ public class MysqlConnection implements ErosaConnection {
 
     private void sendRegisterSlave() throws IOException {
         RegisterSlaveCommandPacket cmd = new RegisterSlaveCommandPacket();
-        cmd.reportHost = authInfo.getAddress().getAddress().getHostAddress();
+        SocketAddress socketAddress = connector.getChannel().getLocalSocketAddress();
+        if (socketAddress == null || !(socketAddress instanceof InetSocketAddress)) {
+            return;
+        }
+
+        InetSocketAddress address = (InetSocketAddress) socketAddress;
+        String host = address.getHostString();
+        int port = address.getPort();
+        cmd.reportHost = host;
+        cmd.reportPort = port;
         cmd.reportPasswd = authInfo.getPassword();
         cmd.reportUser = authInfo.getUsername();
-        cmd.reportPort = authInfo.getAddress().getPort(); // 暂时先用master节点的port
         cmd.serverId = this.slaveId;
         byte[] cmdBody = cmd.toBytes();
 

+ 10 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -7,6 +7,7 @@ import java.util.concurrent.locks.LockSupport;
 
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer;
@@ -63,6 +64,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     private String                       destination;
     private volatile CanalParseException exception;
     private AtomicLong                   eventsPublishBlockingTime;
+    private GTIDSet                      gtidSet;
 
     public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
                                       EventTransactionBuffer transactionBuffer, String destination){
@@ -81,8 +83,8 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
             ringBufferSize,
             new BlockingWaitStrategy());
         int tc = parserThreadCount > 0 ? parserThreadCount : 1;
-        this.parserExecutor = Executors.newFixedThreadPool(tc,
-            new NamedThreadFactory("MultiStageCoprocessor-Parser-" + destination));
+        this.parserExecutor = Executors.newFixedThreadPool(tc, new NamedThreadFactory("MultiStageCoprocessor-Parser-"
+                                                                                      + destination));
 
         this.stageExecutor = Executors.newFixedThreadPool(2, new NamedThreadFactory("MultiStageCoprocessor-other-"
                                                                                     + destination));
@@ -227,6 +229,9 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         public SimpleParserStage(){
             decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
             context = new LogContext();
+            if (gtidSet != null) {
+                context.setGtidSet(gtidSet);
+            }
         }
 
         public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throws Exception {
@@ -447,4 +452,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         this.eventsPublishBlockingTime = eventsPublishBlockingTime;
     }
 
+    public void setGtidSet(GTIDSet gtidSet) {
+        this.gtidSet = gtidSet;
+    }
 }

+ 1 - 14
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -17,7 +17,6 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
-import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
@@ -99,13 +98,6 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     private boolean                     filterRows          = false;
     private boolean                     useDruidDdlFilter   = true;
 
-    // latest gtid
-    private GTIDSet                     gtidSet;
-
-    public LogEventConvert(GTIDSet gtidSet){
-        this.gtidSet = gtidSet;
-    }
-
     public LogEventConvert(){
 
     }
@@ -172,10 +164,9 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
     private Entry parseGTIDLogEvent(GtidLogEvent logEvent) {
         LogHeader logHeader = logEvent.getHeader();
-        String value = logEvent.getSid().toString() + ":" + logEvent.getGno();
         Pair.Builder builder = Pair.newBuilder();
         builder.setKey("gtid");
-        builder.setValue(value);
+        builder.setValue(logEvent.getGtidStr());
 
         if (logEvent.getLastCommitted() != null) {
             builder.setKey("lastCommitted");
@@ -974,8 +965,4 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     public void setFilterRows(boolean filterRows) {
         this.filterRows = filterRows;
     }
-
-    public void setGtidSet(GTIDSet gtidSet) {
-        this.gtidSet = gtidSet;
-    }
 }

+ 2 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogDumpPerformanceTest.java

@@ -20,12 +20,12 @@ public class MysqlBinlogDumpPerformanceTest {
 
     public static void main(String args[]) {
         final MysqlEventParser controller = new MysqlEventParser();
-        final EntryPosition startPosition = new EntryPosition("mysql-bin.001699", 120L, 100L);
+        final EntryPosition startPosition = new EntryPosition("mysql-bin.000007", 89796293L, 100L);
         controller.setConnectionCharset(Charset.forName("UTF-8"));
         controller.setSlaveId(3344L);
         controller.setDetectingEnable(false);
         controller.setFilterQueryDml(true);
-        controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("127.0.0.1", 3328), "root", "hello"));
+        controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("100.81.154.142", 3306), "canal", "canal"));
         controller.setMasterPosition(startPosition);
         controller.setEnableTsdb(false);
         controller.setDestination("example");

+ 1 - 1
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/SimpleDdlParserTest.java

@@ -226,7 +226,7 @@ public class SimpleDdlParserTest {
         Assert.assertEquals("retl", result.getSchemaName());
         Assert.assertEquals("retl_mark", result.getTableName());
 
-        //test index name contains 'on' -- version
+        // test index name contains 'on' -- version
         queryString = "create index schema_new_index_version_s_idx on q_contract_account (contract_id,main_contract_id)";
         result = SimpleDdlParser.parse(queryString, "retl");
         Assert.assertNotNull(result);

+ 1 - 1
store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

@@ -435,7 +435,7 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
                     if (batchMode.isMemSize()) {
                         ackMemSize.addAndGet(memsize);
                         // 尝试清空buffer中的内存,将ack之前的内存全部释放掉
-                        for (long index = sequence + 1; index <= next; index++) {
+                        for (long index = sequence + 1; index < next; index++) {
                             entries[getIndex(index)] = null;// 设置为null
                         }
                     }